1. (中繼的存檔) 準備對 mavlinkObject.py 的 mavlink_object 重構

2. 新增 ringBuffer.py 作為 queue 取代
3. 新增 tests 資料夾分離程式與測試檔案
4. asyncioManager.py 是參考用的範例
chiyu
Chiyu Chen 11 months ago
parent 417d9e8f57
commit 7f7753d0b4

@ -0,0 +1,290 @@
import asyncio
import socket
import threading
import time
import logging
from typing import Dict, Any, Optional, List, Callable, Tuple
from pymavlink import mavutil
from dataclasses import dataclass, field
from .ringBuffer import RingBuffer
# 設置日誌
logger = logging.getLogger("async_io_manager")
@dataclass
class Connection:
"""表示一個 MAVLink 連接"""
socket_id: int
socket: Any # pymavlink socket 或相容物件
buffer: RingBuffer = field(default_factory=lambda: RingBuffer(1024))
last_activity: float = field(default_factory=time.time)
is_closed: bool = False
stats: Dict[str, int] = field(default_factory=lambda: {"rx_count": 0, "rx_bytes": 0, "errors": 0})
class AsyncIOManager:
"""
異步 I/O 管理器
負責高效處理所有 MAVLink socket I/O 操作
"""
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
with cls._lock:
if cls._instance is None:
cls._instance = super(AsyncIOManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self.connections: Dict[int, Connection] = {}
self.next_socket_id = 0
self._running = False
self._event_loop = None
self._thread = None
self.check_interval = 0.01 # 檢查間隔,單位:秒
def start(self):
"""啟動異步 I/O 管理器"""
if self._running:
logger.warning("AsyncIOManager already running")
return
self._running = True
self._thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._thread.start()
logger.info("AsyncIOManager started")
def _run_event_loop(self):
"""在專用執行緒中執行 asyncio 事件循環"""
try:
# 創建新的事件循環
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._event_loop = loop
# 添加主要協程任務
loop.create_task(self._process_all_connections())
# 運行事件循環
loop.run_forever()
except Exception as e:
logger.error(f"Error in AsyncIOManager event loop: {e}")
finally:
if loop.is_running():
loop.stop()
logger.info("AsyncIOManager event loop stopped")
async def _process_all_connections(self):
"""處理所有連接的主循環"""
while self._running:
for socket_id, conn in list(self.connections.items()):
if conn.is_closed:
continue
try:
# 非阻塞模式讀取數據
await asyncio.get_event_loop().run_in_executor(
None, self._read_from_connection, conn
)
except Exception as e:
conn.stats["errors"] += 1
logger.error(f"Error processing connection {socket_id}: {e}")
# 短暫休息,讓出 CPU
await asyncio.sleep(self.check_interval)
def _read_from_connection(self, conn: Connection):
"""從連接讀取數據,此函數在執行器中運行"""
try:
# 使用 mavlink 的非阻塞讀取
msg = conn.socket.recv_msg()
if msg:
# 收到消息,更新統計資訊
conn.last_activity = time.time()
conn.stats["rx_count"] += 1
conn.stats["rx_bytes"] += len(msg.get_msgbuf())
# 將消息放入緩衝區
if not conn.buffer.put(msg):
# 緩衝區已滿,記錄錯誤
logger.warning(f"Buffer full for connection {conn.socket_id}, message dropped")
except Exception as e:
conn.stats["errors"] += 1
if "Connection refused" not in str(e): # 忽略常見的連接拒絕錯誤
logger.error(f"Error reading from socket {conn.socket_id}: {e}")
def stop(self):
"""停止異步 I/O 管理器"""
if not self._running:
return
self._running = False
# 停止事件循環
if self._event_loop:
asyncio.run_coroutine_threadsafe(
self._shutdown_loop(), self._event_loop
)
# 等待執行緒結束
if self._thread and self._thread.is_alive():
self._thread.join(timeout=2.0)
if self._thread.is_alive():
logger.warning("AsyncIOManager thread did not terminate cleanly")
logger.info("AsyncIOManager stopped")
async def _shutdown_loop(self):
"""關閉事件循環"""
# 關閉所有連接
for socket_id in list(self.connections.keys()):
self.remove_connection(socket_id)
# 停止事件循環
loop = asyncio.get_event_loop()
loop.stop()
def register_connection(self, socket: Any) -> int:
"""
註冊新的連接
Args:
socket: pymavlink socket 或相容物件
Returns:
int: 分配的 socket_id
"""
socket_id = self.next_socket_id
self.next_socket_id += 1
# 創建新的連接
connection = Connection(socket_id=socket_id, socket=socket)
self.connections[socket_id] = connection
logger.info(f"Registered new connection with ID {socket_id}")
return socket_id
def remove_connection(self, socket_id: int) -> bool:
"""
移除連接
Args:
socket_id: 連接 ID
Returns:
bool: 成功移除返回 True
"""
if socket_id in self.connections:
conn = self.connections[socket_id]
conn.is_closed = True
try:
# 關閉原始 socket
if hasattr(conn.socket, 'close'):
conn.socket.close()
except Exception as e:
logger.error(f"Error closing socket {socket_id}: {e}")
# 從字典中移除
self.connections.pop(socket_id)
logger.info(f"Removed connection with ID {socket_id}")
return True
return False
def get_buffer(self, socket_id: int) -> Optional[RingBuffer]:
"""
獲取指定連接的緩衝區
Args:
socket_id: 連接 ID
Returns:
RingBuffer None如果連接不存在
"""
if socket_id in self.connections:
return self.connections[socket_id].buffer
return None
def get_stats(self, socket_id: int = None) -> Dict:
"""
獲取連接統計資訊
Args:
socket_id: 指定連接 IDNone 表示所有連接
Returns:
Dict: 統計資訊
"""
if socket_id is not None and socket_id in self.connections:
return self.connections[socket_id].stats
# 彙總所有連接的統計資訊
total_stats = {"rx_count": 0, "rx_bytes": 0, "errors": 0, "connection_count": len(self.connections)}
for conn in self.connections.values():
for key, value in conn.stats.items():
if key in total_stats:
total_stats[key] += value
return total_stats
def send_message(self, socket_id: int, message: Any) -> bool:
"""
發送 MAVLink 訊息
Args:
socket_id: 連接 ID
message: 要發送的訊息
Returns:
bool: 成功發送返回 True
"""
if socket_id not in self.connections:
return False
conn = self.connections[socket_id]
if conn.is_closed:
return False
try:
# 使用 asyncio.run_coroutine_threadsafe 在事件循環中執行 IO 操作
future = asyncio.run_coroutine_threadsafe(
self._send_message_async(conn, message),
self._event_loop
)
# 等待操作完成(可選)
return future.result(timeout=1.0)
except Exception as e:
logger.error(f"Error sending message on socket {socket_id}: {e}")
return False
async def _send_message_async(self, conn: Connection, message: Any) -> bool:
"""在事件循環中異步發送訊息"""
try:
# 調用執行器進行實際發送操作
return await asyncio.get_event_loop().run_in_executor(
None, self._send_message_sync, conn, message
)
except Exception as e:
conn.stats["errors"] += 1
logger.error(f"Async error sending message: {e}")
return False
def _send_message_sync(self, conn: Connection, message: Any) -> bool:
"""同步發送訊息函數,在執行器中運行"""
try:
if hasattr(message, 'get_msgbuf'):
# 如果是 MAVLink 訊息對象
conn.socket.write(message.get_msgbuf())
else:
# 假設是已編碼的字節
conn.socket.write(message)
conn.last_activity = time.time()
return True
except Exception as e:
conn.stats["errors"] += 1
logger.error(f"Error in sync message send: {e}")
return False

@ -2,6 +2,7 @@
import queue
import time
# ROS2 的 import
import rclpy
@ -14,10 +15,16 @@ import mavlinkDevice as md
# ====================== 分割線 =====================
test_item = 51
running_time = 30
test_item = 12
running_time = 3000
print('test_item : ', test_item)
'''
測試項 個位數 表示分離的功能
測試項 1X 表示 mavlink_object 的功能 測試連線的能力
'''
if test_item == 1:
# 測試 mavlink_object 中 updateMultiplexingList 的輸出
print('===> Start of Program .Test ', test_item)
@ -140,7 +147,7 @@ elif test_item == 11:
analyzer = mo.mavlink_bridge()
# 創建通道
connection_string="udp:127.0.0.1:14550"
connection_string="udp:127.0.0.1:15551"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object2 = mo.mavlink_object(mavlink_socket)
# mavlink_object2.multiplexingToAnalysis = [0] # only HEARTBEAT
@ -169,6 +176,7 @@ elif test_item == 11:
print("目前遺失的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].lost_packet_count)
print("現在飛機的模式 : ",analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode'])
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print(mavlink_socket.last_seq)
print("===================")
# 結束程式 退出所有 thread
@ -254,6 +262,91 @@ elif test_item == 12:
analyzer.stop()
print('<=== End of Program')
elif test_item == 13:
# 這邊測試看看能否狸貓換太子的方式
# 把原來 pymavlink 的 mavtcpin 換成 unix socket
print('===> Start of Program .Test ', test_item)
import os
import socket
# 建立一個 mavtcpin 實例
import mavunixin
# ===== 以下就是按照原本測試 12 的流程 =====
# 啟動連線的模組
analyzer = mo.mavlink_bridge()
# 初始化輸入通道
mavlink_socket_unix = mavunixin.mavunixin("unix:/tmp/unix_socket_mavlink.sock", source_system=1, source_component=1)
mavlink_object_in = mo.mavlink_object(mavlink_socket_unix)
mavlink_object_in.multiplexingToAnalysis = [0] # only HEARTBEAT
# 停留五秒 我要啟動另一邊
time.sleep(5)
# 初始化輸出通道
connection_string_out="udpout:127.0.0.1:14553"
mavlink_socket_out = mavutil.mavlink_connection(connection_string_out)
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
mavlink_object_out.multiplexingToAnalysis = [0]
# 做一個空的通道驗證 可以拿來 debug
mavlink_object_none = mo.mavlink_object(None)
# 讓兩個通道互相傳輸
mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all
mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all
# mavlink_object_out.multiplexingToSwap[mavlink_object_none.socket_id] = [-1, ] # all
# 啟動通道
mavlink_object_in.run()
mavlink_object_out.run()
# 做點延遲 讓 heartbeat 先吃進來
time.sleep(3)
print("=== connection established! ===")
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
# print(type(mavlink_socket_out))
# print(type(mavlink_socket_out.mav))
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
test = mo.swap_queues[mavlink_object_none.socket_id].get(block=False)
print('none object : ', test)
except queue.Empty:
pass
if (time.time() - show_time) >= 2:
show_time = time.time()
for sysid in analyzer.mavlink_systems:
for compid in analyzer.mavlink_systems[sysid].components:
print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print("===================")
# 結束程式 退出所有 thread
mavlink_object_in.stop()
mavlink_object_in.thread.join()
mavlink_object_out.stop()
mavlink_object_out.thread.join()
mavlink_socket_unix.close()
mavlink_socket_out.close()
analyzer.stop()
print('<=== End of Program')
elif test_item == 20:
# 這邊測試 node 生成 topic 的功能
# 利用 空的通道 發出假的 heartbeat 封包

@ -307,7 +307,6 @@ class mavlink_object():
self.running = False
def _run_thread(self):
logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id))
logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id))
start_time = time.time()
while self.running:
@ -317,7 +316,6 @@ class mavlink_object():
try:
mavlinkMsg = self.mavlink_socket.recv_msg()
except Exception as e:
logger.critical(f"Receiving data not mavlink format. Object Delete.")
logger.critical(f"Receiving data not mavlink format. Object Delete.")
print(f"[mavlinkObject.py] Error receiving mavlink data: {e}")
print(mavlinkMsg)

@ -0,0 +1,231 @@
# import array
import threading
import ctypes
from typing import Any, Optional, Tuple
class RingBuffer:
"""
高效能無鎖環形緩衝區使用原子操作避免鎖定
用於在不同執行緒間高效傳遞資料
"""
# 緩衝區計數器,用於自動分配 buffer_id
_buffer_counter = 0
_counter_lock = threading.Lock()
def __init__(self, capacity: int = 256, buffer_id: int = None):
"""
初始化環形緩衝區
Args:
capacity: 緩衝區容量 (必須是 2 的次方)
buffer_id: 緩衝區 ID如果為 None 則自動分配
"""
# 確保容量是 2 的次方,便於使用位運算取模
if capacity & (capacity - 1) != 0:
# 找到大於等於 capacity 的最小 2 的次方
capacity = 1 << (capacity - 1).bit_length()
# 分配緩衝區 ID
if buffer_id is None:
with RingBuffer._counter_lock:
buffer_id = RingBuffer._buffer_counter
RingBuffer._buffer_counter += 1
self.buffer_id = buffer_id
self.capacity = capacity
self.mask = capacity - 1 # 用於快速取模
self.buffer = [None] * capacity
# 原子計數器作為讀寫指標
self.write_index = ctypes.c_uint64(0)
self.read_index = ctypes.c_uint64(0)
# 用於檢測上次操作的執行緒 ID
self._last_read_thread = None
self._last_write_thread = None
# 添加同時讀寫統計
self.concurrent_write_count = ctypes.c_uint64(0) # 同時寫入計數
self.concurrent_read_count = ctypes.c_uint64(0) # 同時讀取計數
self.total_write_count = ctypes.c_uint64(0) # 總寫入操作計數
self.total_read_count = ctypes.c_uint64(0) # 總讀取操作計數
self.overflow_count = ctypes.c_uint64(0) # 緩衝區溢出次數
# 記錄各執行緒的操作次數
self.thread_write_counts = {} # {thread_id: count}
self.thread_read_counts = {} # {thread_id: count}
# 用於保護統計數據的鎖(僅用於統計,不影響主要讀寫操作)
self._stats_lock = threading.Lock()
def put(self, item: Any) -> bool:
"""
將項目存入緩衝區
Args:
item: 要存入的項目
Returns:
bool: 成功寫入返回 True緩衝區已滿返回 False
"""
# 更新寫入統計
self.total_write_count.value += 1
# 檢測上次寫入的執行緒
current_thread = threading.get_ident()
# 記錄當前執行緒寫入次數
with self._stats_lock:
self.thread_write_counts[current_thread] = self.thread_write_counts.get(current_thread, 0) + 1
# 檢測是否為不同執行緒同時寫入
if self._last_write_thread is not None and current_thread != self._last_write_thread:
self.concurrent_write_count.value += 1
self._last_write_thread = current_thread
# 原子獲取當前寫入位置
current = self.write_index.value
next_pos = (current + 1) & self.mask
# 檢查緩衝區是否已滿
if next_pos == self.read_index.value:
self.overflow_count.value += 1
return False
# 寫入資料並原子更新寫入位置
self.buffer[current] = item
self.write_index.value = next_pos
return True
def get(self) -> Optional[Any]:
"""
從緩衝區讀取項目
Returns:
項目或 None如果緩衝區為空
"""
# 更新讀取統計
self.total_read_count.value += 1
# 檢測上次讀取的執行緒
current_thread = threading.get_ident()
# 記錄當前執行緒讀取次數
with self._stats_lock:
self.thread_read_counts[current_thread] = self.thread_read_counts.get(current_thread, 0) + 1
# 檢測是否為不同執行緒同時讀取
if self._last_read_thread is not None and current_thread != self._last_read_thread:
self.concurrent_read_count.value += 1
self._last_read_thread = current_thread
# 檢查緩衝區是否為空
if self.read_index.value == self.write_index.value:
return None
# 原子獲取當前讀取位置並讀取資料
current = self.read_index.value
item = self.buffer[current]
# 原子更新讀取位置
self.read_index.value = (current + 1) & self.mask
return item
def get_all(self) -> list:
"""
獲取緩衝區中的所有項目
Returns:
list: 所有可用項目的列表
"""
items = []
while True:
item = self.get()
if item is None:
break
items.append(item)
return items
def size(self) -> int:
# 返回目前緩衝區內項目數量
return (self.write_index.value - self.read_index.value) & self.mask
def is_empty(self) -> bool:
# 檢查緩衝區是否為空
return self.read_index.value == self.write_index.value
def is_full(self) -> bool:
# 檢查緩衝區是否已滿
return ((self.write_index.value + 1) & self.mask) == self.read_index.value
def clear(self) -> None:
"""清空緩衝區"""
self.read_index.value = self.write_index.value
def get_stats(self) -> dict:
"""
獲取緩衝區統計資訊
Returns:
dict: 包含各種統計數據的字典
"""
with self._stats_lock:
stats = {
"buffer_id": self.buffer_id,
"capacity": self.capacity,
"current_size": self.size(),
"is_full": self.is_full(),
"is_empty": self.is_empty(),
"total_writes": self.total_write_count.value,
"total_reads": self.total_read_count.value,
"concurrent_writes": self.concurrent_write_count.value,
"concurrent_reads": self.concurrent_read_count.value,
"overflow_count": self.overflow_count.value,
"write_threads": len(self.thread_write_counts),
"read_threads": len(self.thread_read_counts),
"concurrent_write_ratio": self.concurrent_write_count.value / max(1, self.total_write_count.value),
"concurrent_read_ratio": self.concurrent_read_count.value / max(1, self.total_read_count.value),
"top_writer_threads": sorted(self.thread_write_counts.items(), key=lambda x: x[1], reverse=True)[:3],
"top_reader_threads": sorted(self.thread_read_counts.items(), key=lambda x: x[1], reverse=True)[:3],
}
return stats
def print_stats(self) -> None:
"""輸出當前緩衝區統計資訊"""
stats = self.get_stats()
print(f"\n=== RingBuffer #{stats['buffer_id']} Statistics ===")
print(f"Capacity: {stats['capacity']}, Current Size: {stats['current_size']}")
print(f"Total Write Operations: {stats['total_writes']}")
print(f"Total Read Operations: {stats['total_reads']}")
print(f"Concurrent Write Events: {stats['concurrent_writes']} ({stats['concurrent_write_ratio']:.2%})")
print(f"Concurrent Read Events: {stats['concurrent_reads']} ({stats['concurrent_read_ratio']:.2%})")
print(f"Buffer Overflow Count: {stats['overflow_count']}")
print(f"Writing Threads: {stats['write_threads']}")
print(f"Reading Threads: {stats['read_threads']}")
print("Top Writer Threads:")
for thread_id, count in stats['top_writer_threads']:
print(f" Thread {thread_id}: {count} writes")
print("Top Reader Threads:")
for thread_id, count in stats['top_reader_threads']:
print(f" Thread {thread_id}: {count} reads")
print("=============================\n")
def reset_stats(self) -> None:
"""重置所有統計數據"""
with self._stats_lock:
self.concurrent_write_count.value = 0
self.concurrent_read_count.value = 0
self.total_write_count.value = 0
self.total_read_count.value = 0
self.overflow_count.value = 0
self.thread_write_counts.clear()
self.thread_read_counts.clear()
def __str__(self) -> str:
"""返回緩衝區的字符串表示"""
return f"RingBuffer(id={self.buffer_id}, capacity={self.capacity}, size={self.size()})"

@ -0,0 +1,246 @@
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import time
import threading
import socket
import random
from pymavlink import mavutil
from pymavlink.dialects.v20 import common as mavlink2
from fc_network_adapter.asyncioManager import AsyncIOManager, Connection
class MockMAVLinkSocket:
"""模擬 pymavlink socket 行為的類,用於測試"""
def __init__(self, name="mock", simulate_errors=False):
self.name = name
self.simulate_errors = simulate_errors
self.is_closed = False
self.received_msgs = []
self.seq = 0
def recv_msg(self):
"""模擬接收 MAVLink 消息"""
if self.is_closed:
return None
# 隨機模擬網絡延遲
time.sleep(random.uniform(0.01, 0.05))
# 偶爾模擬錯誤
if self.simulate_errors and random.random() < 0.1:
raise Exception("Simulated network error")
# 70% 的機率返回消息30% 返回 None (模擬無數據可讀)
if random.random() > 0.3:
msg = self._generate_mavlink_message()
self.received_msgs.append(msg)
return msg
return None
def _generate_mavlink_message(self):
"""生成模擬 MAVLink 消息"""
# 創建 MAVLink 實例
mav = mavlink2.MAVLink(self)
# 設置來源系統和組件
mav.srcSystem = random.randint(1, 5)
mav.srcComponent = random.randint(1, 5)
# 創建 HEARTBEAT 消息
msg = mavlink2.MAVLink_heartbeat_message(
type=mavlink2.MAV_TYPE_QUADROTOR,
autopilot=mavlink2.MAV_AUTOPILOT_GENERIC,
base_mode=0,
custom_mode=0,
system_status=0,
mavlink_version=3
)
# 打包消息
msg.pack(mav)
self.seq += 1
return msg
def write(self, data):
"""模擬發送數據"""
if self.is_closed:
raise Exception("Socket is closed")
# 模擬寫入操作
time.sleep(random.uniform(0.001, 0.01))
# 偶爾模擬錯誤
if self.simulate_errors and random.random() < 0.1:
raise Exception("Simulated write error")
return len(data)
def close(self):
"""關閉連接"""
self.is_closed = True
def create_mock_mavlink_sockets(count=3):
"""創建多個模擬 MAVLink socket"""
sockets = []
for i in range(count):
# 每三個 socket 中有一個模擬錯誤
simulate_errors = (i % 3 == 0)
socket = MockMAVLinkSocket(f"mock_{i}", simulate_errors)
sockets.append(socket)
return sockets
def monitor_thread(io_manager):
"""監視 AsyncIOManager 狀態的執行緒"""
print("Starting monitoring thread...")
try:
while True:
# 獲取總體統計資訊
stats = io_manager.get_stats()
print(f"\n=== AsyncIOManager Stats ===")
print(f"Total connections: {stats.get('connection_count', 0)}")
print(f"Total messages received: {stats.get('rx_count', 0)}")
print(f"Total bytes received: {stats.get('rx_bytes', 0)}")
print(f"Total errors: {stats.get('errors', 0)}")
# 獲取每個連接的緩衝區狀態
for socket_id in io_manager.connections:
buffer = io_manager.get_buffer(socket_id)
conn_stats = io_manager.get_stats(socket_id)
if buffer and not io_manager.connections[socket_id].is_closed:
print(f"Connection {socket_id}: Buffer size={buffer.size()}, " +
f"Messages={conn_stats.get('rx_count', 0)}, " +
f"Last activity={time.time() - io_manager.connections[socket_id].last_activity:.1f}s ago")
# 每秒更新一次
time.sleep(1.0)
except KeyboardInterrupt:
print("Monitoring thread stopped.")
def consumer_thread(io_manager, socket_id):
"""從指定連接的緩衝區消費消息"""
print(f"Starting consumer for socket {socket_id}...")
try:
while True:
# 獲取緩衝區
buffer = io_manager.get_buffer(socket_id)
if not buffer:
print(f"Consumer {socket_id}: Buffer not found or connection closed")
break
# 獲取所有消息
messages = buffer.get_all()
if messages:
print(f"Consumer {socket_id}: Received {len(messages)} messages")
for msg in messages:
# 處理每條消息 (這裡只是示例,實際應用中可能有更複雜的處理)
print(f" - Message from SysID={msg.get_srcSystem()}, CompID={msg.get_srcComponent()}, Type={msg.get_type()}")
# 短暫休眠,避免 CPU 空轉
time.sleep(0.2)
except Exception as e:
print(f"Consumer {socket_id} stopped: {e}")
def send_command_thread(io_manager, socket_ids):
"""定期向所有連接發送命令"""
print("Starting command sender thread...")
# 創建一個 MAVLink 實例用於生成消息
mav = mavlink2.MAVLink(None)
mav.srcSystem = 255 # GCS system ID
mav.srcComponent = 0
try:
count = 0
while True:
# 等待 2 秒
time.sleep(2.0)
count += 1
# 創建一個命令消息
msg = mavlink2.MAVLink_command_long_message(
target_system=1,
target_component=1,
command=mavlink2.MAV_CMD_REQUEST_PROTOCOL_VERSION,
confirmation=0,
param1=0, param2=0, param3=0,
param4=0, param5=0, param6=0, param7=0
)
# 向所有連接發送消息
for socket_id in socket_ids:
if socket_id in io_manager.connections and not io_manager.connections[socket_id].is_closed:
try:
# 打包並發送消息
result = io_manager.send_message(socket_id, msg)
print(f"Sent command to socket {socket_id}: {'Success' if result else 'Failed'}")
except Exception as e:
print(f"Error sending to socket {socket_id}: {e}")
# 每 10 次迭代關閉一個連接 (用於演示關閉功能)
if count % 10 == 0 and socket_ids:
socket_id = socket_ids.pop(0)
print(f"Closing connection {socket_id}...")
io_manager.remove_connection(socket_id)
except Exception as e:
print(f"Command sender thread stopped: {e}")
def main():
print("AsyncIOManager Example")
print("======================")
# 創建 AsyncIOManager 實例
io_manager = AsyncIOManager()
# 啟動 I/O 管理器
io_manager.start()
print("AsyncIOManager started")
# 創建模擬 MAVLink socket
mock_sockets = create_mock_mavlink_sockets(5)
socket_ids = []
# 註冊連接
for i, socket in enumerate(mock_sockets):
socket_id = io_manager.register_connection(socket)
socket_ids.append(socket_id)
print(f"Registered socket {i} with ID {socket_id}")
# 啟動監視執行緒
monitor = threading.Thread(target=monitor_thread, args=(io_manager,))
monitor.daemon = True
monitor.start()
# 為每個連接啟動消費者執行緒
consumers = []
for socket_id in socket_ids:
consumer = threading.Thread(target=consumer_thread, args=(io_manager, socket_id))
consumer.daemon = True
consumer.start()
consumers.append(consumer)
# 啟動命令發送執行緒
sender = threading.Thread(target=send_command_thread, args=(io_manager, socket_ids.copy()))
sender.daemon = True
sender.start()
print("\nPress Ctrl+C to exit...")
try:
# 運行一段時間然後退出
time.sleep(30)
except KeyboardInterrupt:
pass
finally:
# 清理資源
print("\nShutting down...")
io_manager.stop()
print("AsyncIOManager stopped")
if __name__ == "__main__":
main()

@ -0,0 +1,152 @@
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import time
import threading
from fc_network_adapter.ringBuffer import RingBuffer
def producer(buffer, count, interval=0.01):
"""生產者:向緩衝區添加資料"""
print(f"Producer started (thread {threading.get_ident()})")
for i in range(count):
# 嘗試寫入數據,直到成功
while not buffer.put(f"Item-{i}"):
print(f"Buffer full, producer waiting... (thread {threading.get_ident()})")
time.sleep(0.1)
print(f"Produced: Item-{i}, buffer size: {buffer.size()}")
time.sleep(interval) # 模擬生產過程
print(f"Producer finished (thread {threading.get_ident()})")
def consumer(buffer, max_items, interval=0.05):
"""消費者:從緩衝區讀取資料"""
print(f"Consumer started (thread {threading.get_ident()})")
items_consumed = 0
while items_consumed < max_items:
# 嘗試讀取數據
item = buffer.get()
if item:
print(f"Consumed: {item}, buffer size: {buffer.size()}")
items_consumed += 1
else:
print(f"Buffer empty, consumer waiting... (thread {threading.get_ident()})")
time.sleep(interval) # 模擬消費過程
print(f"Consumer finished (thread {threading.get_ident()})")
def batch_consumer(buffer, interval=0.2):
"""批量消費者:一次性讀取緩衝區中的所有資料"""
print(f"Batch consumer started (thread {threading.get_ident()})")
for _ in range(5): # 執行5次批量讀取
time.sleep(interval) # 等待緩衝區積累數據
items = buffer.get_all()
if items:
print(f"Batch consumed {len(items)} items: {items}")
else:
print("Buffer empty for batch consumer")
print(f"Batch consumer finished (thread {threading.get_ident()})")
def demonstrate_multi_writer():
"""示範多個寫入執行緒同時操作緩衝區"""
print("\n=== Demonstrating Multiple Writers ===")
buffer = RingBuffer(capacity=80)
# 創建多個生產者執行緒
threads = []
for i in range(3):
thread = threading.Thread(target=producer, args=(buffer, 5, 0.1 * (i+1)))
threads.append(thread)
thread.start()
# 等待所有執行緒完成
for thread in threads:
thread.join()
buffer.print_stats() # 印出統計資訊
# 讀出所有剩餘資料
remaining = buffer.get_all()
print(f"Remaining items in buffer after multiple writers: {remaining}")
def demonstrate_basic_usage():
"""示範基本使用方式"""
print("\n=== Demonstrating Basic Usage ===")
# 創建緩衝區
buffer = RingBuffer(capacity=20, buffer_id=7)
# 檢查初始狀態
print(f"Initial buffer state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
# 添加幾個項目
for i in range(5):
buffer.put(f"Test-{i}")
# 檢查狀態
print(f"After adding 5 items - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
# 讀取一個項目
item = buffer.get()
print(f"Read item: {item}")
print(f"After reading 1 item - Content Size: {buffer.size()}")
# 添加更多項目直到滿
items_added = 0
while not buffer.is_full():
buffer.put(f"Fill-{items_added}")
items_added += 1
print(f"Added {items_added} more items until full")
print(f"Buffer full state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
# 嘗試添加到已滿的緩衝區
result = buffer.put("Overflow")
print(f"Attempt to add to full buffer: {'Succeeded' if result else 'Failed'}")
# 獲取所有項目
all_items = buffer.get_all()
print(f"All items in buffer: {all_items}")
print(f"Buffer after get_all() - Empty: {buffer.is_empty()}, Content Size: {buffer.size()}")
# 印出統計資訊
buffer.print_stats()
def demonstrate_producer_consumer():
"""示範生產者-消費者模式"""
print("\n=== Demonstrating Producer-Consumer Pattern ===")
buffer = RingBuffer(capacity=16)
# 創建生產者和消費者執行緒
producer_thread = threading.Thread(target=producer, args=(buffer, 20, 0.1))
consumer_thread = threading.Thread(target=consumer, args=(buffer, 3, 0.2))
batch_thread = threading.Thread(target=batch_consumer, args=(buffer, 0.5))
# 啟動執行緒
producer_thread.start()
consumer_thread.start()
batch_thread.start()
# 等待執行緒完成
producer_thread.join()
consumer_thread.join()
batch_thread.join()
# 檢查最終狀態
print(f"Final buffer state - Empty: {buffer.is_empty()}, Size: {buffer.size()}")
buffer.print_stats()
if __name__ == "__main__":
# 展示各種使用場景
# demonstrate_basic_usage()
# demonstrate_producer_consumer()
demonstrate_multi_writer()
print("\nAll demonstrations completed!")
Loading…
Cancel
Save