diff --git a/src/fc_network_adapter/fc_network_adapter/asyncioManager.py b/src/fc_network_adapter/fc_network_adapter/asyncioManager.py deleted file mode 100644 index 9a9ba65..0000000 --- a/src/fc_network_adapter/fc_network_adapter/asyncioManager.py +++ /dev/null @@ -1,290 +0,0 @@ -import asyncio -import socket -import threading -import time -import logging -from typing import Dict, Any, Optional, List, Callable, Tuple -from pymavlink import mavutil -from dataclasses import dataclass, field -from .ringBuffer import RingBuffer - -# 設置日誌 -logger = logging.getLogger("async_io_manager") - -@dataclass -class Connection: - """表示一個 MAVLink 連接""" - socket_id: int - socket: Any # pymavlink socket 或相容物件 - buffer: RingBuffer = field(default_factory=lambda: RingBuffer(1024)) - last_activity: float = field(default_factory=time.time) - is_closed: bool = False - stats: Dict[str, int] = field(default_factory=lambda: {"rx_count": 0, "rx_bytes": 0, "errors": 0}) - -class AsyncIOManager: - """ - 異步 I/O 管理器 - 負責高效處理所有 MAVLink socket 的 I/O 操作 - """ - _instance = None - _lock = threading.Lock() - - def __new__(cls, *args, **kwargs): - with cls._lock: - if cls._instance is None: - cls._instance = super(AsyncIOManager, cls).__new__(cls) - cls._instance._initialized = False - return cls._instance - - def __init__(self): - if self._initialized: - return - - self._initialized = True - self.connections: Dict[int, Connection] = {} - self.next_socket_id = 0 - self._running = False - self._event_loop = None - self._thread = None - self.check_interval = 0.01 # 檢查間隔,單位:秒 - - def start(self): - """啟動異步 I/O 管理器""" - if self._running: - logger.warning("AsyncIOManager already running") - return - - self._running = True - self._thread = threading.Thread(target=self._run_event_loop, daemon=True) - self._thread.start() - logger.info("AsyncIOManager started") - - def _run_event_loop(self): - """在專用執行緒中執行 asyncio 事件循環""" - try: - # 創建新的事件循環 - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - self._event_loop = loop - - # 添加主要協程任務 - loop.create_task(self._process_all_connections()) - - # 運行事件循環 - loop.run_forever() - except Exception as e: - logger.error(f"Error in AsyncIOManager event loop: {e}") - finally: - if loop.is_running(): - loop.stop() - logger.info("AsyncIOManager event loop stopped") - - async def _process_all_connections(self): - """處理所有連接的主循環""" - while self._running: - for socket_id, conn in list(self.connections.items()): - if conn.is_closed: - continue - - try: - # 非阻塞模式讀取數據 - await asyncio.get_event_loop().run_in_executor( - None, self._read_from_connection, conn - ) - except Exception as e: - conn.stats["errors"] += 1 - logger.error(f"Error processing connection {socket_id}: {e}") - - # 短暫休息,讓出 CPU - await asyncio.sleep(self.check_interval) - - def _read_from_connection(self, conn: Connection): - """從連接讀取數據,此函數在執行器中運行""" - try: - # 使用 mavlink 的非阻塞讀取 - msg = conn.socket.recv_msg() - if msg: - # 收到消息,更新統計資訊 - conn.last_activity = time.time() - conn.stats["rx_count"] += 1 - conn.stats["rx_bytes"] += len(msg.get_msgbuf()) - - # 將消息放入緩衝區 - if not conn.buffer.put(msg): - # 緩衝區已滿,記錄錯誤 - logger.warning(f"Buffer full for connection {conn.socket_id}, message dropped") - except Exception as e: - conn.stats["errors"] += 1 - if "Connection refused" not in str(e): # 忽略常見的連接拒絕錯誤 - logger.error(f"Error reading from socket {conn.socket_id}: {e}") - - def stop(self): - """停止異步 I/O 管理器""" - if not self._running: - return - - self._running = False - - # 停止事件循環 - if self._event_loop: - asyncio.run_coroutine_threadsafe( - self._shutdown_loop(), self._event_loop - ) - - # 等待執行緒結束 - if self._thread and self._thread.is_alive(): - self._thread.join(timeout=2.0) - if self._thread.is_alive(): - logger.warning("AsyncIOManager thread did not terminate cleanly") - - logger.info("AsyncIOManager stopped") - - async def _shutdown_loop(self): - """關閉事件循環""" - # 關閉所有連接 - for socket_id in list(self.connections.keys()): - self.remove_connection(socket_id) - - # 停止事件循環 - loop = asyncio.get_event_loop() - loop.stop() - - def register_connection(self, socket: Any) -> int: - """ - 註冊新的連接 - - Args: - socket: pymavlink socket 或相容物件 - - Returns: - int: 分配的 socket_id - """ - socket_id = self.next_socket_id - self.next_socket_id += 1 - - # 創建新的連接 - connection = Connection(socket_id=socket_id, socket=socket) - self.connections[socket_id] = connection - - logger.info(f"Registered new connection with ID {socket_id}") - return socket_id - - def remove_connection(self, socket_id: int) -> bool: - """ - 移除連接 - - Args: - socket_id: 連接 ID - - Returns: - bool: 成功移除返回 True - """ - if socket_id in self.connections: - conn = self.connections[socket_id] - conn.is_closed = True - - try: - # 關閉原始 socket - if hasattr(conn.socket, 'close'): - conn.socket.close() - except Exception as e: - logger.error(f"Error closing socket {socket_id}: {e}") - - # 從字典中移除 - self.connections.pop(socket_id) - logger.info(f"Removed connection with ID {socket_id}") - return True - return False - - def get_buffer(self, socket_id: int) -> Optional[RingBuffer]: - """ - 獲取指定連接的緩衝區 - - Args: - socket_id: 連接 ID - - Returns: - RingBuffer 或 None(如果連接不存在) - """ - if socket_id in self.connections: - return self.connections[socket_id].buffer - return None - - def get_stats(self, socket_id: int = None) -> Dict: - """ - 獲取連接統計資訊 - - Args: - socket_id: 指定連接 ID,None 表示所有連接 - - Returns: - Dict: 統計資訊 - """ - if socket_id is not None and socket_id in self.connections: - return self.connections[socket_id].stats - - # 彙總所有連接的統計資訊 - total_stats = {"rx_count": 0, "rx_bytes": 0, "errors": 0, "connection_count": len(self.connections)} - for conn in self.connections.values(): - for key, value in conn.stats.items(): - if key in total_stats: - total_stats[key] += value - return total_stats - - def send_message(self, socket_id: int, message: Any) -> bool: - """ - 發送 MAVLink 訊息 - - Args: - socket_id: 連接 ID - message: 要發送的訊息 - - Returns: - bool: 成功發送返回 True - """ - if socket_id not in self.connections: - return False - - conn = self.connections[socket_id] - if conn.is_closed: - return False - - try: - # 使用 asyncio.run_coroutine_threadsafe 在事件循環中執行 IO 操作 - future = asyncio.run_coroutine_threadsafe( - self._send_message_async(conn, message), - self._event_loop - ) - # 等待操作完成(可選) - return future.result(timeout=1.0) - except Exception as e: - logger.error(f"Error sending message on socket {socket_id}: {e}") - return False - - async def _send_message_async(self, conn: Connection, message: Any) -> bool: - """在事件循環中異步發送訊息""" - try: - # 調用執行器進行實際發送操作 - return await asyncio.get_event_loop().run_in_executor( - None, self._send_message_sync, conn, message - ) - except Exception as e: - conn.stats["errors"] += 1 - logger.error(f"Async error sending message: {e}") - return False - - def _send_message_sync(self, conn: Connection, message: Any) -> bool: - """同步發送訊息函數,在執行器中運行""" - try: - if hasattr(message, 'get_msgbuf'): - # 如果是 MAVLink 訊息對象 - conn.socket.write(message.get_msgbuf()) - else: - # 假設是已編碼的字節 - conn.socket.write(message) - conn.last_activity = time.time() - return True - except Exception as e: - conn.stats["errors"] += 1 - logger.error(f"Error in sync message send: {e}") - return False \ No newline at end of file diff --git a/src/fc_network_adapter/fc_network_adapter/devRun.py b/src/fc_network_adapter/fc_network_adapter/devRun.py deleted file mode 100644 index c638659..0000000 --- a/src/fc_network_adapter/fc_network_adapter/devRun.py +++ /dev/null @@ -1,624 +0,0 @@ -# 基礎功能的 import -import queue -import time - - -# ROS2 的 import -import rclpy - -# mavlink 的 import -from pymavlink import mavutil - -# 自定義的 import -import mavlinkObject as mo -import mavlinkDevice as md - -# ====================== 分割線 ===================== - -test_item = 12 -running_time = 3000 -print('test_item : ', test_item) - -''' -測試項 個位數 表示分離的功能 - -測試項 1X 表示 mavlink_object 的功能 測試連線的能力 -''' - -if test_item == 1: - # 測試 mavlink_object 中 updateMultiplexingList 的輸出 - print('===> Start of Program .Test ', test_item) - mavlink_object_none = mo.mavlink_object(None) - - print('=====================') - print('正常範例') - mavlink_object_none.multiplexingToAnalysis = [] - mavlink_object_none.multiplexingToReturn = [] - # mavlink_object_none.multiplexingToSwap = [[]] - ret = mavlink_object_none.updateMultiplexingList() - print('execute result : ', ret) - print('multiplexingList : ', mavlink_object_none._multiplexingList) - - print('=====================') - print('錯誤範例 長度不一致') - mavlink_object_none.multiplexingToSwap = [[-1], [11,12,13], [14,15,16],] - ret = mavlink_object_none.updateMultiplexingList() - print('execute result : ', ret) - - print('=====================') - print('正常範例') - mo.swap_queues.append(queue.Queue()) - mo.swap_queues.append(queue.Queue()) - mavlink_object_none.multiplexingToAnalysis = [0, 1, 2, 3, 4, 5] - mavlink_object_none.multiplexingToReturn = [6, 7, 8, 9, 10] - mavlink_object_none.multiplexingToSwap = [[-1], [11,12,13], [14,15,16],] - ret = mavlink_object_none.updateMultiplexingList() - print('execute result : ', ret) - print('multiplexingList : ', mavlink_object_none._multiplexingList) - - print('=====================') - print('錯誤範例 multiplexingToAnalysis 不可以有 -1') - mavlink_object_none.multiplexingToAnalysis = [-1, 1, 2, 3, 4, 5] - ret = mavlink_object_none.updateMultiplexingList() - print('execute result : ', ret) - - print('<=== End of Program') - -elif test_item == 2: - # 測試 mavlink_object 創建時 socket_id 是否正確 - # 說實在 這個測試項 似乎因為 python 的 GC 機制 會導致難以測試 - print('===> Start of Program .Test ', test_item) - mavlink_object_none1 = mo.mavlink_object(None) - mavlink_object_none2 = mo.mavlink_object(None) - mavlink_object_none3 = mo.mavlink_object(None) - del mavlink_object_none2 - - print(len(mo.swap_queues)) - - mavlink_object_none2 = mo.mavlink_object(None) - print(len(mo.swap_queues)) - - print(mavlink_object_none1._multiplexingList) - - print('<=== End of Program') - -elif test_item == 10: - # 需要開啟一個 ardupilot 的模擬器 - # 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來 - # 只啟用了 mavlink_object 的功能 - print('===> Start of Program .Test ', test_item) - - # 創建一個空的通道 這個通道的 socket_id 是 0 - mavlink_object_none = mo.mavlink_object(None) - - # 創建另一個通道 - connection_string="udp:127.0.0.1:14550" - mavlink_socket = mavutil.mavlink_connection(connection_string) - mavlink_object1 = mo.mavlink_object(mavlink_socket) - # 設定通道轉發的參數 - mavlink_object1.multiplexingToAnalysis = [30] # only ATTITUDE - mavlink_object1.multiplexingToReturn = [0] # only HEARTBEAT - mavlink_object1.multiplexingToSwap[0] = [74, ] # only VFR_HUD - - # print(mavlink_object1.multiplexingToSwap) - # print(mo.swap_queues) - - # 啟動通道 - mavlink_object1.run() - - # 確認轉拋的設定有沒有錯 - print("_multiplexingList for mavlink object :", mavlink_object1._multiplexingList) - - # 運行幾秒並印出 queue 的資料 - start_time = time.time() - while time.time() - start_time < running_time: - while not mo.fixed_stream_bridge_queue.empty(): - print('fixed_stream_bridge_queue:') - t = mo.fixed_stream_bridge_queue.get() - print('from {} : {}'.format(t[0],t[2])) - while not mo.return_packet_processor_queue.empty(): - print('return_packet_processor_queue:') - t = mo.return_packet_processor_queue.get() - print('from {} : {}'.format(t[0],t[2])) - # print(t[2].get_msgbuf()) - # print(t[2].type) - - for n in range(len(mo.swap_queues)): - q = mo.swap_queues[n] - while not q.empty(): - print('swap_queues:') - t = q.get() - print('{} gets : {}'.format(n,t)) - - - # 結束程式 退出所有 thread - mavlink_object1.running = False - mavlink_object1.thread.join() - mavlink_socket.close() - print('<=== End of Program') - -elif test_item == 11: - # 需要開啟一個 ardupilot 的模擬器 - # 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況 - # 啟用 mavlink_object 與 mavlink_bridge的thread區塊 的功能 - print('===> Start of Program .Test ', test_item) - - # 啟動 mavlink_bridge - analyzer = mo.mavlink_bridge() - - # 創建通道 - connection_string="udp:127.0.0.1:15551" - mavlink_socket = mavutil.mavlink_connection(connection_string) - mavlink_object2 = mo.mavlink_object(mavlink_socket) - # mavlink_object2.multiplexingToAnalysis = [0] # only HEARTBEAT - # mavlink_object2.multiplexingToReturn = [] - - # 啟動連線的模組 - mavlink_object2.run() - - # 做點延遲 讓 heartbeat 先吃進來 - time.sleep(2) - print("=== connection established! ===") - - # 印出目前所有 mavlink_systems 的內容 - print('目前所有的系統 : ') - for sysid in analyzer.mavlink_systems: - print(analyzer.mavlink_systems[sysid]) - - start_time = time.time() - show_time = time.time() - compid = 1 - while time.time() - start_time < running_time: - if (time.time() - show_time) >= 1: - show_time = time.time() - for sysid in analyzer.mavlink_systems: - print("目前收到的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].msg_count) - print("目前遺失的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].lost_packet_count) - print("現在飛機的模式 : ",analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode']) - analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) - print(mavlink_socket.last_seq) - print("===================") - - # 結束程式 退出所有 thread - mavlink_object2.stop() - mavlink_object2.thread.join() - analyzer.stop() - - analyzer = mo.mavlink_bridge() # 這邊是測試是否只有一個 instance - analyzer.thread.join() - mavlink_socket.close() - print('<=== End of Program') - -elif test_item == 12: - # 需要開啟一個 ardupilot 的模擬器 與 GCS - # 這邊是測試 mavlink object 作為交換器功能的代碼 - print('===> Start of Program .Test ', test_item) - - # 啟動連線的模組 - analyzer = mo.mavlink_bridge() - - # 初始化輸入通道 - connection_string_in="udp:127.0.0.1:15551" - mavlink_socket_in = mavutil.mavlink_connection(connection_string_in) - mavlink_object_in = mo.mavlink_object(mavlink_socket_in) - mavlink_object_in.multiplexingToAnalysis = [0] # only HEARTBEAT - - # 初始化輸出通道 - connection_string_out="udpout:127.0.0.1:14553" - mavlink_socket_out = mavutil.mavlink_connection(connection_string_out) - mavlink_object_out = mo.mavlink_object(mavlink_socket_out) - mavlink_object_out.multiplexingToAnalysis = [0] - - # 做一個空的通道驗證 可以拿來 debug - mavlink_object_none = mo.mavlink_object(None) - - # 讓兩個通道互相傳輸 - mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all - mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all - # mavlink_object_out.multiplexingToSwap[mavlink_object_none.socket_id] = [-1, ] # all - - # 啟動通道 - mavlink_object_in.run() - mavlink_object_out.run() - - # 做點延遲 讓 heartbeat 先吃進來 - time.sleep(3) - print("=== connection established! ===") - - print('目前所有的系統 : ') - for sysid in analyzer.mavlink_systems: - print(analyzer.mavlink_systems[sysid]) - - # print(type(mavlink_socket_out)) - # print(type(mavlink_socket_out.mav)) - - start_time = time.time() - show_time = time.time() - while time.time() - start_time < running_time: - try: - test = mo.swap_queues[mavlink_object_none.socket_id].get(block=False) - print('none object : ', test) - except queue.Empty: - pass - - if (time.time() - show_time) >= 2: - show_time = time.time() - for sysid in analyzer.mavlink_systems: - for compid in analyzer.mavlink_systems[sysid].components: - print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count)) - analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) - print("===================") - - - - # 結束程式 退出所有 thread - mavlink_object_in.stop() - mavlink_object_in.thread.join() - mavlink_object_out.stop() - mavlink_object_out.thread.join() - mavlink_socket_in.close() - mavlink_socket_out.close() - - analyzer.stop() - print('<=== End of Program') - -elif test_item == 13: - # 這邊測試看看能否狸貓換太子的方式 - # 把原來 pymavlink 的 mavtcpin 換成 unix socket - print('===> Start of Program .Test ', test_item) - - import os - import socket - - # 建立一個 mavtcpin 實例 - import mavunixin - - - - - # ===== 以下就是按照原本測試 12 的流程 ===== - # 啟動連線的模組 - analyzer = mo.mavlink_bridge() - - # 初始化輸入通道 - mavlink_socket_unix = mavunixin.mavunixin("unix:/tmp/unix_socket_mavlink.sock", source_system=1, source_component=1) - mavlink_object_in = mo.mavlink_object(mavlink_socket_unix) - mavlink_object_in.multiplexingToAnalysis = [0] # only HEARTBEAT - - # 停留五秒 我要啟動另一邊 - time.sleep(5) - - # 初始化輸出通道 - connection_string_out="udpout:127.0.0.1:14553" - mavlink_socket_out = mavutil.mavlink_connection(connection_string_out) - mavlink_object_out = mo.mavlink_object(mavlink_socket_out) - mavlink_object_out.multiplexingToAnalysis = [0] - - # 做一個空的通道驗證 可以拿來 debug - mavlink_object_none = mo.mavlink_object(None) - - # 讓兩個通道互相傳輸 - mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all - mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all - # mavlink_object_out.multiplexingToSwap[mavlink_object_none.socket_id] = [-1, ] # all - - # 啟動通道 - mavlink_object_in.run() - mavlink_object_out.run() - - # 做點延遲 讓 heartbeat 先吃進來 - time.sleep(3) - print("=== connection established! ===") - - print('目前所有的系統 : ') - for sysid in analyzer.mavlink_systems: - print(analyzer.mavlink_systems[sysid]) - - # print(type(mavlink_socket_out)) - # print(type(mavlink_socket_out.mav)) - - start_time = time.time() - show_time = time.time() - while time.time() - start_time < running_time: - try: - test = mo.swap_queues[mavlink_object_none.socket_id].get(block=False) - print('none object : ', test) - except queue.Empty: - pass - - if (time.time() - show_time) >= 2: - show_time = time.time() - for sysid in analyzer.mavlink_systems: - for compid in analyzer.mavlink_systems[sysid].components: - print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count)) - analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) - print("===================") - - - - # 結束程式 退出所有 thread - mavlink_object_in.stop() - mavlink_object_in.thread.join() - mavlink_object_out.stop() - mavlink_object_out.thread.join() - mavlink_socket_unix.close() - mavlink_socket_out.close() - - analyzer.stop() - print('<=== End of Program') - -elif test_item == 20: - # 這邊測試 node 生成 topic 的功能 - # 利用 空的通道 發出假的 heartbeat 封包 - print('===> Start of Program .Test ', test_item) - rclpy.init() # 注意要初始化 rclpy 才能使用 node - - from pymavlink.dialects.v20 import common as mavlink2 - - sysid = 1 - compid = 1 - - # 啟動 mavlink_bridge - analyzer = mo.mavlink_bridge() - - mav = mavlink2.MAVLink(None) - mav.srcSystem = sysid - mav.srcComponent = compid - - # 這是一個 heartbeat 封包 - fake_heartbeat = mavlink2.MAVLink_heartbeat_message( - type=mavlink2.MAV_TYPE_QUADROTOR, - autopilot=mavlink2.MAV_AUTOPILOT_ARDUPILOTMEGA, - base_mode=3, - custom_mode=0, - system_status=0, - mavlink_version=3 - ) - mavlink_bytes = fake_heartbeat.pack(mav) - fake_heartbeat_msg = mav.parse_char(mavlink_bytes) - - print(analyzer.mavlink_systems) - - mo.fixed_stream_bridge_queue.put((0, 0, fake_heartbeat_msg)) # socket id, timestamp, data - time.sleep(0.1) - print(analyzer.mavlink_systems) - - # ROS2 初始化 - analyzer._init_node() - - # 創建 ROS2 topic - analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # sysid, compid - print("topic created") - time.sleep(5) - - # 丟出 topic - analyzer.emit_info() - - # 結束程式 - analyzer.running = False - analyzer.destroy_node() - - rclpy.shutdown() - analyzer.stop() - analyzer.thread.join() - print('<=== End of Program') - -elif test_item == 21: - # 需要開啟一個 ardupilot 的模擬器 - # 這邊是測試代碼 引入 rclpy 來測試 node 的運行 - - print('===> Start of Program .Test ', test_item) - rclpy.init() # 注意要初始化 rclpy 才能使用 node - - # 啟動 mavlink_bridge - analyzer = mo.mavlink_bridge() - # 關於 Node 的初始化 - show_time = time.time() - analyzer._init_node() # 初始化 node - print('初始化 node 完成 耗時 : ',time.time() - show_time) - - # 創建通道 - connection_string="udp:127.0.0.1:15551" - mavlink_socket = mavutil.mavlink_connection(connection_string) - mavlink_object3 = mo.mavlink_object(mavlink_socket) - # 啟動通道 - mavlink_object3.run() - - - print('=== waiting for mavlink data ...') - time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 - - print('目前所有的系統 : ') - for sysid in analyzer.mavlink_systems: - print(analyzer.mavlink_systems[sysid]) - - compid = 1 - sysid = 1 - start_time = time.time() - analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) - end_time = time.time() - print(f"Execution time for create_flightMode: {end_time - start_time} seconds") - - print("start emit info") - - start_time = time.time() - show_time = time.time() - while time.time() - start_time < running_time: - try: - # rclpy.spin(analyzer) - analyzer.emit_info() # 這邊是測試 node 的運行 - time.sleep(1) - except KeyboardInterrupt: - break - - - # 程式結束 - analyzer.destroy_node() - rclpy.shutdown() - - # 結束程式 退出所有 thread - mavlink_object3.stop() - mavlink_object3.thread.join() - analyzer.stop() - analyzer.thread.join() - - mavlink_socket.close() - print('<=== End of Program') - -elif test_item == 22: - # 需要開啟一個 ardupilot 的模擬器 與 GCS - # 這邊是測試代碼 引入 rclpy 來測試 node 的運行 - print('===> Start of Program .Test ', test_item) - rclpy.init() # 注意要初始化 rclpy 才能使用 node - - # 啟動 mavlink_bridge - analyzer = mo.mavlink_bridge() - # 關於 Node 的初始化 - show_time = time.time() - analyzer._init_node() # 初始化 node - print('初始化 node 完成 耗時 : ',time.time() - show_time) - - # 初始化兩個通道 - connection_string_in="udp:127.0.0.1:15551" - mavlink_socket_in = mavutil.mavlink_connection(connection_string_in) - mavlink_object_in = mo.mavlink_object(mavlink_socket_in) - mavlink_object_in.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147] - - - connection_string_out="udpout:127.0.0.1:14553" - mavlink_socket_out = mavutil.mavlink_connection(connection_string_out) - mavlink_object_out = mo.mavlink_object(mavlink_socket_out) - mavlink_object_out.multiplexingToAnalysis = [] - - - - # 讓兩個通道互相傳輸 - mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all - mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all - - # 啟動通道 - mavlink_object_in.run() - mavlink_object_out.run() - - print('waiting for mavlink data ...') - time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 - - print('目前所有的系統 : ') - for sysid in analyzer.mavlink_systems: - print(analyzer.mavlink_systems[sysid]) - - compid = 1 - sysid = 1 - show_time = time.time() - analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) - print(f"Execution time for create_flightMode: {time.time() - show_time} seconds") - - print("start emit info") - - start_time = time.time() - show_time = time.time() - show_time2 = time.time() - - while time.time() - start_time < running_time: - if (time.time() - show_time2) >= 1: - analyzer.emit_info() # 這邊是測試 node 的運行 - # sss = analyzer.mavlink_systems[1].components[1].emitParams['flightMode_mode'] - fmsg = analyzer.mavlink_systems[1].components[1].emitParams['flightMode'] - sss = mavutil.mode_string_v10(fmsg) - # print("目前的飛行模式 : {}, Msg Seq : {}".format(sss, fmsg.get_seq())) - print("目前的飛行模式 : {}".format(sss)) - show_time2 = time.time() - - # if (time.time() - show_time) >= 2: - # show_time = time.time() - # for sysid in analyzer.mavlink_systems: - # for compid in analyzer.mavlink_systems[sysid].components: - # print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count)) - # analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) - # print("===================") - - - - analyzer.destroy_node() - rclpy.shutdown() - - - # 結束程式 退出所有 thread - mavlink_object_in.stop() - mavlink_object_in.thread.join() - mavlink_socket_in.close() - mavlink_object_out.stop() - mavlink_object_out.thread.join() - mavlink_socket_out.close() - analyzer.stop() - analyzer.thread.join() - - - print('<=== End of Program') - -elif test_item == 51: - - # 晉凱的測試項目 - print('===> Start of Program .Test ', test_item) - rclpy.init() # 注意要初始化 rclpy 才能使用 node - - # 啟動 mavlink_bridge - analyzer = mo.mavlink_bridge() - # 關於 Node 的初始化 - show_time = time.time() - analyzer._init_node() # 初始化 node - print('初始化 node 完成 耗時 : ',time.time() - show_time) - - # 創建通道 - connection_string="udp:127.0.0.1:15551" - mavlink_socket3 = mavutil.mavlink_connection(connection_string) - mavlink_object3 = mo.mavlink_object(mavlink_socket3) - # 設定通道流動 - mavlink_object3.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147] - mavlink_object3.multiplexingToReturn = [] # - # mavlink_object3.multiplexingToSwap = [] # - # 啟動通道 - mavlink_object3.run() - - - - print('waiting for mavlink data ...') - time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 - - compid = 1 - sysid = 1 - start_time = time.time() - analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) - analyzer.create_attitude(sysid, analyzer.mavlink_systems[sysid].components[compid]) - analyzer.create_local_position_pose(sysid, analyzer.mavlink_systems[sysid].components[compid]) - analyzer.create_local_position_velocity(sysid, analyzer.mavlink_systems[sysid].components[compid]) - analyzer.create_global_global(sysid, analyzer.mavlink_systems[sysid].components[compid]) - analyzer.create_vfr_hud(sysid, analyzer.mavlink_systems[sysid].components[compid]) - analyzer.create_battery(sysid, analyzer.mavlink_systems[sysid].components[compid]) - analyzer.create_global_rel(sysid, analyzer.mavlink_systems[sysid].components[compid]) - end_time = time.time() - print(f"Execution time for create all topic: {end_time - start_time} seconds") - - print("start emit info") - - start_time = time.time() - show_time = time.time() - while time.time() - start_time < running_time: - try: - # rclpy.spin(analyzer) - analyzer.emit_info() # 這邊是測試 node 的運行 - time.sleep(0.5) - except KeyboardInterrupt: - break - - analyzer.destroy_node() - rclpy.shutdown() - - # 結束程式 退出所有 thread - mavlink_object3.stop() - mavlink_object3.thread.join() - analyzer.stop() - analyzer.thread.join() - - mavlink_socket3.close() - print('<=== End of Program') \ No newline at end of file diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py b/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py index 4ae4c98..47a1c92 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py @@ -1,6 +1,6 @@ # 自定義的 import -from theLogger import setup_logger +from .theLogger import setup_logger # ====================== 分割線 ===================== diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py index b55107c..d0231ed 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py @@ -1,8 +1,7 @@ ''' # 不同的匯流排只有單一種通訊協定 -# 匯流排接到訊息後透過 queue stack 傳送到橋接器 -# 會有三種 Queue 分別作為 1. 固定串流橋接器 2. 回傳封包處理器 3. 轉拋串流 -# 第三種比較麻煩 會需要用 list 管理多個轉換器的 queue +# 匯流排接到訊息後透過 ring buffer 傳送到橋接器 +# 會有兩種 RingBuffer 分別作為 1. 固定串流橋接器 2. 回傳封包處理器 # 橋接器會將解析得到的結論 再透過 ros2 的 publisher 發送出去 # 關於 mavlink 採用 pymavlink 這個套件 製作匯流排 @@ -13,8 +12,11 @@ # 基礎功能的 import import threading -import queue +# import queue import time +import asyncio +from enum import Enum, auto +# from typing import Dict, List, Optional, Set, Any, Tuple # mavlink 的 import from pymavlink import mavutil @@ -23,38 +25,17 @@ from pymavlink import mavutil from rclpy.node import Node # 自定義的 import -from mavlinkDevice import mavlink_device -from mavlinkPublish import mavlink_publisher -from theLogger import setup_logger - +from .mavlinkDevice import mavlink_device, mavlink_systems +from .mavlinkPublish import mavlink_publisher +from .theLogger import setup_logger +from .ringBuffer import RingBuffer # ====================== 分割線 ===================== logger = setup_logger("mavlinkObject.py") -# 基礎功能的 import -import threading -import queue -import time - -# mavlink 的 import -from pymavlink import mavutil - -# ROS2 的 import -from rclpy.node import Node - -# 自定義的 import -from mavlinkDevice import mavlink_device, mavlink_systems -from mavlinkPublish import mavlink_publisher -from theLogger import setup_logger - -# ====================== 分割線 ===================== - -logger = setup_logger("mavlinkObject.py") - -fixed_stream_bridge_queue = queue.Queue() -return_packet_processor_queue = queue.Queue() -swap_queues = [] # 這個 list 用來存放轉拋串流的 queue # 這些 queue 同時也是各自 mavlink_object 的 output buffer +stream_bridge_ring = RingBuffer(capacity=1024, buffer_id=255) +return_packet_ring = RingBuffer(capacity=256, buffer_id=254) # ====================== 分割線 ===================== @@ -75,6 +56,8 @@ class mavlink_bridge(Node, mavlink_publisher): node 區塊則是處理 ros2 的 publisher 和 subscriber 訂閱相關 藉由控制 ros2 的機制再把 device object 的資訊發送出去 + fixed_stream_bridge_queue 置換成 stream_bridge_ring + ps. 我限制了這個 class 只能有一個 instance ''' _instance = None @@ -106,13 +89,25 @@ class mavlink_bridge(Node, mavlink_publisher): # === Thread 區塊 === def _run_thread(self): - # start_time = time.time() # debug + start_time = time.time() # debug + run_loop_count = 0 # debug logger.info('Start of mavlink_bridge._run_thread') - # 從 Queue fixed_stream_bridge_queue 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理 + # 從 Queue stream_bridge_ring 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理 while self.running: - if fixed_stream_bridge_queue.empty(): + # # 這個迴圈每秒鐘被執行了幾輪? # 這整段都是 debug 用的 + # current_time = time.time() + # if (current_time - start_time) >= 1.0: + # logger.info(f'mavlink_bridge._run_thread loop count: {run_loop_count}') + # run_loop_count = 0 + # start_time = current_time + # else: + # run_loop_count += 1 + # # ========================= + + + if stream_bridge_ring.is_empty(): continue - msg_pack = fixed_stream_bridge_queue.get() + msg_pack = stream_bridge_ring.get() msg = msg_pack[2] sysid = msg.get_srcSystem() @@ -150,7 +145,7 @@ class mavlink_bridge(Node, mavlink_publisher): this_component.emitParams['flightMode'] = msg # debug # print("mav_type : ", msg.type) # debug - # print("get mode :", mavutil.mode_string_v10(msg)) # debug + print("get mode :", mavutil.mode_string_v10(msg)) # debug # print("record mode :", this_component.emitParams['flightMode_mode']) # debug elif msg_id == 30: # ATTITUDE 處理 @@ -201,24 +196,45 @@ class mavlink_bridge(Node, mavlink_publisher): # ====================== 分割線 ===================== -class mavlink_object(): +''' +本次改版的暫時記錄註解於此 +1. mavlink_object 中 由於 Queue 的效能太差 會完全移除 + 其中 multiplexingToAnalysis multiplexingToReturn 的功能會改用 ring_buffer 來實現 + 而 multiplexingToSwap 會完全被移除代替方式下一條描述 +2. mavlink_object 會捨棄每個通道單獨 thread 的實現 轉而採用 asyncio 的方式 將需要資料轉換的通道 以群組方式處理其數據流 + (註解: 因為本專案的規模還不大 目前不做動態分配 asyncio thread 而是簡單的採用單一 thread 處理所有的 mavlink_object) + 因此 資料轉換直接使用通道的 socket 寫出 進而節省任何資料的複製與搬移 + 並且 完全捨棄 multiplexingToSwap 不會再對需要轉傳的資料進行過濾 而是將全部的 mavlink_msg 直接 socket 透過寫出 +3. mavlink_object 需要加上 state 去管理其狀態 +4. mavlink_object 需要加上 target port 去管理寫出的目標 +5. mavlink_object 要容忍髒資料流入 而不是直接關閉通道 +6. 基於第1,2項 updateMultiplexingList 會被完全移除 +7. 基於第2項 需要新建一個 async_io_manager 類別去管理所有的 mavlink_object +8. 基於第1項全域變數 fixed_stream_bridge_queue return_packet_processor_queue 會用 stream_bridge_ring 與 return_packet_ring 來取代 另外 swap_queues 會被完全移除 + +''' + +# 定義 mavlink_object 的狀態 +class MavlinkObjectState(Enum): + INIT = auto() # 初始化狀態 + RUNNING = auto() # 運行中狀態 + ERROR = auto() # 錯誤狀態 + CLOSED = auto() # 已關閉狀態 + +class mavlink_object: ''' 每個 mavlink bus 都會有一個 mavlink_object - 其中主要是 thread 做統計封包與分流 - 分流表的控制在三個 list 分別為 - multiplexingToAnalysis : 這個 list 是用來分流到固定串流橋接器的 - multiplexingToReturn : 這個 list 是用來分流到回傳封包處理器的 - multiplexingToSwap : 這個 list 是用來分流到轉拋串流的 - 透過先更新上述三個 list 後再執行 updateMultiplexingList 可以變更分流行為 + 使用 asyncio 處理資料流 用 RingBuffer 來分配訊息 + 直接透過 socket 寫出 ''' - mavlinkObjects = {} # 用來記錄所有的 mavlink_object instance 資料格式 { socket_id(序號) : mavlink_object(物件實例) } - socket_num = 0 # 用來記錄目前的 socket 數量 + mavlinkObjects = {} # 用來記錄所有的 mavlink_object instance 資料格式 { socket_id(序號) : mavlink_object(物件實例) } + socket_num = 0 # 用來記錄目前的 socket 數量 + def __new__(cls, *args, **kwargs): # 每創建一個實例 就將其添加到 mavlinkObjects 列表中 # 創建時 會檢查 mavlinkObjects 列表中空缺的 socket_id 序號 # 若序號無中斷 則 socket_id 往後加一 若序號有中斷 則填補最小的序號 # socket_id 從 0 開始 - instance = super().__new__(cls) socket_id = 0 for k in cls.mavlinkObjects.keys(): @@ -232,180 +248,389 @@ class mavlink_object(): return instance def __init__(self, socket): + # 登入所需的 socket self.mavlink_socket = socket - # 這邊變數是執行的時候被使用的 不要直接寫入它 - self._multiplexingList = [] - # 存放要發送的訊息的 queue 或稱 buffer - self.output_buffer = queue.Queue() - if self.socket_id >= len(swap_queues): - swap_queues.append(self.output_buffer) - else: - swap_queues[self.socket_id] = self.output_buffer - + + # 存放目前是否分流到固定串流橋接器和回傳封包處理器的標誌 # 都是要做的 把這個判斷拿掉 + # self.send_to_bridge = True + # self.send_to_return = False + + # 用於主線程發送消息的緩衝區 + self.outgoing_msg_lock = threading.Lock() + self.outgoing_msgs = [] + + # 記錄訊息過濾類型 (可選) + self.bridge_msg_types = [0] # 默認只處理 HEARTBEAT + self.return_msg_types = [] + + # 目標端口 + self.target_sockets = [] + # 關聯到全域變數 global mavlink_systems self.mavlink_systems = mavlink_systems - - # 這三個 list 用來分配不同的訊息到不同的 queue - self.multiplexingToAnalysis = [ - 0, # HEARTBEAT # 挺必要的項目 - # 24, # GPS_RAW_INT - # 30, # ATTITUDE - # 33, # GLOBAL_POSITION_INT - # 74, # VFR_HUD - # 147, # BATTERY_STATUS - ] - self.multiplexingToReturn = [] - self.multiplexingToSwap = [ - [] for _ in range(len(swap_queues)) - ] - - # 刷新其他 mavlink_object 的 multiplexingToSwap - for k, object in self.mavlinkObjects.items(): - if (k != self.socket_id) and (len(object.multiplexingToSwap) <= self.socket_id): - object.multiplexingToSwap.append([]) - object.updateMultiplexingList() - - logger.info('mavlink_object instance {} created'.format(self.socket_id)) + + # 物件變數 + self.task = None # Task reference + self.dirtyDataCount = 0 # 髒資料計數器 + self.dirtyDataMax = 10 # 髒資料容許閾值 + + self.state = MavlinkObjectState.INIT + logger.info(f'mavlink_object instance {self.socket_id} created') def __del__(self): - # 停下自己的 thread - if self.mavlink_socket != None: - self.mavlink_socket.close() - self.stop() - - # 移除其他 mavlink_object 的 multiplexingToSwap - for k, object in self.mavlinkObjects.items(): - if (k != self.socket_id) and (len(object.multiplexingToSwap) > self.socket_id): - object.multiplexingToSwap[self.socket_id] = [] - object.updateMultiplexingList() - - # 移除自己的 swap_queues - swap_queues[self.socket_id] = None - + # 停止 asyncio task + self.stop() + + # 關閉 socket + if hasattr(self, 'mavlink_socket') and self.mavlink_socket: + try: + self.mavlink_socket.close() + except: + pass + # 處理 class 的 instance 記錄 - self.socket_num -= 1 - self.mavlinkObjects.pop(self.socket_id) # 刪除這個 instance - - # logger 模組在這邊會一直掛掉 找不到問題 先用 print 代替 - # print('mavlink_object instance {} deleted'.format(self.socket_id)) # debug - # logger.info('mavlink_object instance {} deleted'.format(self.socket_id)) + if hasattr(self.__class__, 'socket_num'): + self.__class__.socket_num -= 1 + + if hasattr(self.__class__, 'mavlinkObjects') and hasattr(self, 'socket_id'): + if self.socket_id in self.__class__.mavlinkObjects: + self.__class__.mavlinkObjects.pop(self.socket_id) + + # 這段不知道怎麼了 反正會一直讓 logger ERROR 我先關掉 # try: - # logger.info('mavlink_object instance {} deleted'.format(self.socket_id)) + # logger.info(f'mavlink_object instance {self.socket_id} deleted') # except Exception as e: # print(f"Error logging in __del__: {e}") - # if 'logger' in globals() and logger: - # logger.info('mavlink_object instance {} deleted'.format(self.socket_id)) - def run(self): - # TODO 檢查 socket 是否有連線 - self.thread = threading.Thread(target=self._run_thread) - self.running = self.updateMultiplexingList() - self.thread.start() + def start(self): + """啟動 mavlink_object 處理循環""" + if self.state == MavlinkObjectState.RUNNING: + logger.warning(f"mavlink_object {self.socket_id} is already running") + return + + self.state = MavlinkObjectState.RUNNING + # 實際的啟動會由 async_io_manager 處理 def stop(self): - self.running = False - - def _run_thread(self): - logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id)) - start_time = time.time() - while self.running: - timestamp = time.time() # 記錄接受到訊息的時間 # 這邊若是在大量訊息造成壅塞時 可能會有些微偏差 - - # 處理接收到的封包 + """停止 mavlink_object 處理循環""" + if self.state in (MavlinkObjectState.CLOSED, MavlinkObjectState.ERROR): + return + + self.state = MavlinkObjectState.CLOSED + if self.task: + if not self.task.done(): + self.task.cancel() + + async def process_data(self): + """處理 mavlink 數據的主要 asyncio 協程""" + logger.info(f'Start of mavlink_object.process_data id: {self.socket_id}') + + while self.state == MavlinkObjectState.RUNNING: + timestamp = time.time() try: + # 處理接收到的封包 mavlinkMsg = self.mavlink_socket.recv_msg() except Exception as e: - logger.critical(f"Receiving data not mavlink format. Object Delete.") - print(f"[mavlinkObject.py] Error receiving mavlink data: {e}") - print(mavlinkMsg) - self.running = False - break - - if mavlinkMsg: # data type should be 'pymavlink.dialects.v20.ardupilotmega. etc...' - # 統計收到的訊息 & 透過 mavlink 封包的序列號來檢查是否有遺失的封包 & 記錄最後收到的封包時間 - sysid = mavlinkMsg.get_srcSystem() - compid = mavlinkMsg.get_srcComponent() - if sysid in self.mavlink_systems: # 只有當這個 system id 已經透過 HEARTBEAT 訊號被初始化過 才會記錄相關訊息 - # mavlinkMsg.get_type() # mavlinkMsg.get_msgId() # 前者是字串 後者是 int 都是代表 mavlink 類型 - mavlink_systems[sysid].updateComponentPacketCount(compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp) - - # break # Debug function can put here you can see the input data from mavlink - - # 將訊息依照 multiplexing list 分發到不同的 queue - for i in range(len(self._multiplexingList)): - if (self._multiplexingList[i] == []): - continue - elif (mavlinkMsg.get_msgId() in self._multiplexingList[i]) or (self._multiplexingList[i][0] == -1): - if i == 0: - fixed_stream_bridge_queue.put((self.socket_id,timestamp,mavlinkMsg)) - elif i == 1: - return_packet_processor_queue.put((self.socket_id,timestamp,mavlinkMsg)) - else: - _queue = swap_queues[i-2] - _queue.put(mavlinkMsg) - - # 處理要送出的封包 - # 如果 有資料在 output_buffer 中則將其取出並發送 - # 沒有就略過發送 + logger.warning(f"Error in mavlink_object.process_data for id {self.socket_id}: {e}") + self.dirtyDataCount += 1 + + if self.dirtyDataCount >= self.dirtyDataMax: + logger.error(f"Too many dirty data received in mavlink_object {self.socket_id}, entering ERROR state") + self.state = MavlinkObjectState.ERROR + # 不直接退出,嘗試容忍髒數據 + await asyncio.sleep(0.01) # 短暫暫停 + continue + try: - mavlinkMsg_send = self.output_buffer.get(block=False) - except queue.Empty: - mavlinkMsg_send = None - # except Exception as e: - # logger.error(f"Error getting data from output_buffer: {e}") - # mavlinkMsg_send = None - - if mavlinkMsg_send: - # self.mavlink_socket.mav.send(mavlinkMsg_send) - self.mavlink_socket.write(mavlinkMsg_send.get_msgbuf()) # 這邊會將封包寫入 socket 中 - - # thread 結束 - logger.info('End of mavlink_object._run_thread id : {}'.format(self.socket_id)) - - def updateMultiplexingList(self): - ''' - 更新 multiplexing list 並做簡單的檢查 - ''' - # 檢查 multiplexingToAnalysis 與 multiplexingToReturn 是否有 -1 值 - check = (-1 in self.multiplexingToAnalysis) or (-1 in self.multiplexingToReturn) - if check: - logger.error('MultiplexingToAnalysis or MultiplexingToReturn NOT all type spilt. socket id : {}'.format(self.socket_id)) - return False - - # 檢查 multiplexingToSwap 與 swap_queues 的長度是否一致 而且 swap_queues 的長度不能為 0 - check = len(self.multiplexingToSwap) != len(swap_queues) or len(swap_queues) == 0 - if check: - logger.error('Multiplexing Queue not fit List , Please check the list. socket id : {}'.format(self.socket_id)) + if mavlinkMsg: + # 統計收到的訊息 & 透過 mavlink 封包的序列號來檢查是否有遺失的封包 & 記錄最後收到的封包時間 + sysid = mavlinkMsg.get_srcSystem() + compid = mavlinkMsg.get_srcComponent() + + if sysid in self.mavlink_systems: # 只有當這個 system id 已經透過 HEARTBEAT 訊號被初始化過 才會記錄相關訊息 + self.mavlink_systems[sysid].updateComponentPacketCount( + compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp) #TODO 這邊怪怪的 好像要再檢查 + + # 分發訊息到 RingBuffer + msg_id = mavlinkMsg.get_msgId() + + # if self.send_to_bridge and (msg_id in self.bridge_msg_types or -1 in self.bridge_msg_types): + if (msg_id in self.bridge_msg_types or -1 in self.bridge_msg_types): + stream_bridge_ring.put((self.socket_id, timestamp, mavlinkMsg)) + + # if self.send_to_return and (msg_id in self.return_msg_types or -1 in self.return_msg_types): + if (msg_id in self.return_msg_types or -1 in self.return_msg_types): + return_packet_ring.put((self.socket_id, timestamp, mavlinkMsg)) + + # 將接收到的訊息轉發給所有目標端口 + for target_port in self.target_sockets: + if target_port != self.socket_id and target_port in self.mavlinkObjects: + target_obj = self.mavlinkObjects[target_port] + if target_obj.state == MavlinkObjectState.RUNNING: + try: + target_obj.mavlink_socket.write(mavlinkMsg.get_msgbuf()) + except Exception as e: + logger.error(f"Error forwarding message to port {target_port}: {e}") + + with self.outgoing_msg_lock: + if self.outgoing_messages and (send_msg := self.outgoing_messages.pop(0)): + try: + self.mavlink_socket.write(send_msg) + except Exception as e: + logger.error(f"mavlink_object {self.socket_id} failed to send message: {e}") + + # 這邊的重點不是延遲 而是透過 await 讓出 event loop 的控制權 + await asyncio.sleep(0.001) + + except asyncio.CancelledError: + logger.info(f'mavlink_object.process_data for id {self.socket_id} was cancelled') + break + except Exception as e: + await asyncio.sleep(0.01) # 短暫暫停避免CPU過載 + + + logger.info(f'End of mavlink_object.process_data id: {self.socket_id}') + self.state = MavlinkObjectState.CLOSED + + def add_target_socket(self, target_socket_id): + """添加一個目標端口用於轉發""" + if target_socket_id not in self.target_sockets and target_socket_id != self.socket_id: + self.target_sockets.append(target_socket_id) + logger.info(f"mavlink_object Added target port {target_socket_id} to mavlink_object {self.socket_id}") + return True + return False + + def remove_target_socket(self, target_socket_id): + """移除目標端口""" + if target_socket_id in self.target_sockets: + self.target_sockets.remove(target_socket_id) + logger.info(f"mavlink_object Removed target port {target_socket_id} from mavlink_object {self.socket_id}") + return True + return False + + def set_bridge_message_types(self, msg_types): + """設置需要分流到橋接器的訊息類型""" + if isinstance(msg_types, list) and all(isinstance(t, int) for t in msg_types): + self.bridge_msg_types = msg_types + return True + logger.error(f"Invalid bridge message types: {msg_types}") + return False + + def set_return_message_types(self, msg_types): + """設置需要分流到回傳處理器的訊息類型""" + if isinstance(msg_types, list) and all(isinstance(t, int) for t in msg_types): + self.return_msg_types = msg_types + return True + logger.error(f"Invalid return message types: {msg_types}") + return False + + def send_message(self, message_bytes): + """ + 從主線程向此 mavlink_object 的 socket 發送數據 + 將數據添加到簡單的列表中,由 asyncio 任務處理 + + Args: + message_bytes: 要發送的 mavlink 消息的字節數據 + + Returns: + bool: 是否成功添加消息到列表 + """ + if self.state != MavlinkObjectState.RUNNING: + logger.warning(f"Cannot send message: mavlink_object {self.socket_id} is not running") return False - - # 對應到自己的 multiplexingToSwap 必需為空 避免對自己迴圈轉拋 + try: - self.multiplexingToSwap[self.socket_id] = [] - except IndexError: - logger.error('Multiplexing List of self socket id should be void. socket id : {}'.format(self.socket_id)) - return False - - # 組合 multiplexing list - multiL_tmp = [self.multiplexingToAnalysis, self.multiplexingToReturn] + self.multiplexingToSwap - - # 檢查 multiplexing list 格式是否有錯誤 # 全部都要是 list 每個 list 裡面都要是 int - check = all(isinstance(i, list) and all(isinstance(j, int) for j in i) for i in multiL_tmp) - if not check: - logger.error('Multiplexing List Format Error, Please check the list. socket id : {}'.format(self.socket_id)) - return False # 若有錯誤則回傳 False - - # 更新 multiplexing list - self._multiplexingList = multiL_tmp + # 使用鎖保護共享資源訪問 + with self.outgoing_messages_lock: + self.outgoing_messages.append(message_bytes) + return True + except Exception as e: + logger.error(f"Error queueing message for mavlink_object {self.socket_id}: {e}") + return False + + # def enable_bridge(self, enable=True): + # """啟用或禁用橋接器分流""" + # self.send_to_bridge = enable + + # def enable_return(self, enable=True): + # """啟用或禁用回傳處理器分流""" + # self.send_to_return = enable + +class async_io_manager: + """ + 管理所有 mavlink_object 實例的 asyncio 任務 + 提供單一線程來處理所有 mavlink 通道的數據 + """ + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + with cls._lock: + if cls._instance is None: + cls._instance = super(async_io_manager, cls).__new__(cls) + return cls._instance + + def __init__(self): + if not hasattr(self, 'initialized'): + self.initialized = True + self.loop = None + self.main_task = None + self.running = False + self.managed_objects = {} # socket_id: task + + def start(self): + """啟動 async_io_manager 和其管理的所有 mavlink_object""" + if self.running: + logger.warning("async_io_manager already running") + return + + self.running = True + self.thread = threading.Thread(target=self._run_event_loop) + self.thread.start() + logger.info("async_io_manager started") + + def stop(self): + """停止 async_io_manager 和其管理的所有 mavlink_object""" + if not self.running: + return + + self.running = False + + if self.loop: + # 取消所有任務 + for socket_id in list(self.managed_objects.keys()): + self.remove_mavlink_object(socket_id) + + # 停止事件循環 + if self.main_task and not self.main_task.done(): + asyncio.run_coroutine_threadsafe(self._shutdown(), self.loop) + + # 等待線程結束 + if hasattr(self, 'thread') and self.thread.is_alive(): + self.thread.join(timeout=5.0) + + logger.info("async_io_manager stopped") + + async def _shutdown(self): + """正確關閉事件循環的協程""" + tasks = [task for task in asyncio.all_tasks(self.loop) if task is not asyncio.current_task()] + + for task in tasks: + task.cancel() + + await asyncio.gather(*tasks, return_exceptions=True) + self.loop.stop() + + def _run_event_loop(self): + """在獨立線程中運行事件循環""" + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + try: + self.main_task = self.loop.create_task(self._main_task()) + self.loop.run_forever() + except Exception as e: + logger.error(f"Error in async_io_manager event loop: {e}") + finally: + self.loop.close() + self.loop = None + self.running = False + logger.info("async_io_manager event loop ended") + + async def _main_task(self): + """主任務協程,用於監視和管理其他任務""" + logger.info("async_io_manager main task started") + + try: + while self.running: + # 這邊就是一個無窮迴圈 確保 async_io_manager 物件續存 + # 每秒鐘檢查並移除已完成或已取消的任務 + await asyncio.sleep(1.0) + for socket_id in list(self.managed_objects.keys()): + task = self.managed_objects[socket_id] + + if task.done(): + try: + exc = task.exception() + if exc: + logger.error(f"Task for mavlink_object {socket_id} raised exception: {exc}") + except (asyncio.CancelledError, asyncio.InvalidStateError): + pass + + if socket_id in mavlink_object.mavlinkObjects: + print(f"this is manager, i make socket_id {socket_id} closed, he is now in {mavlink_object.mavlinkObjects[socket_id].state} state.") # debug + mavlink_object.mavlinkObjects[socket_id].state = MavlinkObjectState.CLOSED + + del self.managed_objects[socket_id] + + + + except asyncio.CancelledError: + logger.info("async_io_manager main task was cancelled") + except Exception as e: + logger.error(f"Error in async_io_manager main task: {e}") + + logger.info("async_io_manager main task ended") + + def add_mavlink_object(self, mavlink_obj): + """添加一個 mavlink_object 實例到 async_io_manager 並啟動其處理任務""" + if not self.running: + logger.error("Cannot add mavlink_object: async_io_manager is not running") + return False + + if not isinstance(mavlink_obj, mavlink_object): + logger.error(f"Invalid mavlink_object: {mavlink_obj}") + return False + + socket_id = mavlink_obj.socket_id + + if socket_id in self.managed_objects: + logger.warning(f"mavlink_object {socket_id} already managed") + return False + + # 創建並啟動任務 + if self.loop: + # task = asyncio.run_coroutine_threadsafe(mavlink_obj.process_data(), self.loop).result() + future = asyncio.run_coroutine_threadsafe(mavlink_obj.process_data(), self.loop) + + # print(f"Task created for mavlink_object {socket_id}: {future1}") # debug + # task = future1.result() + self.managed_objects[socket_id] = future + mavlink_obj.task = future + mavlink_obj.state = MavlinkObjectState.RUNNING + logger.info(f"Added mavlink_object {socket_id} to async_io_manager") + return True + + return False + + def remove_mavlink_object(self, socket_id): + """從 async_io_manager 中移除一個 mavlink_object 實例並取消其處理任務""" + if socket_id not in self.managed_objects: + logger.warning(f"mavlink_object {socket_id} not managed by async_io_manager") + return False + + # 取消任務 + task = self.managed_objects[socket_id] + if not task.done(): + task.cancel() + + # 從管理列表中移除 + del self.managed_objects[socket_id] + + # 更新 mavlink_object 狀態 + if socket_id in mavlink_object.mavlinkObjects: + mavlink_object.mavlinkObjects[socket_id].state = MavlinkObjectState.CLOSED + + logger.info(f"Removed mavlink_object {socket_id} from async_io_manager") return True + + def get_managed_objects(self): + """獲取所有被管理的對象列表""" + return list(self.managed_objects.keys()) # ====================== 分割線 ===================== -# 整合到 ros2 之後的程式進入點 -def main_node(): - pass - - if __name__ == '__main__': pass diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py b/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py index 028e3ee..727e47f 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py @@ -9,14 +9,16 @@ publisher topic name 命名規則為 <前綴詞>/s/<具體 topic> ''' +# ROS2 的 import import std_msgs.msg import sensor_msgs.msg import geometry_msgs.msg import mavros_msgs.msg -from theLogger import setup_logger import math +# 自定義的 import +from .theLogger import setup_logger logger = setup_logger("mavlinkPublish.py") diff --git a/src/fc_network_adapter/fc_network_adapter/test_mavlinkObject.py b/src/fc_network_adapter/fc_network_adapter/test_mavlinkObject.py new file mode 100644 index 0000000..e69de29 diff --git a/src/fc_network_adapter/tests/devRun.py b/src/fc_network_adapter/tests/devRun.py new file mode 100644 index 0000000..0707b51 --- /dev/null +++ b/src/fc_network_adapter/tests/devRun.py @@ -0,0 +1,448 @@ +import os +import sys +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +# 基礎功能的 import +import queue +import time + +# ROS2 的 import +import rclpy + +# mavlink 的 import +from pymavlink import mavutil + +# 自定義的 import +from fc_network_adapter import mavlinkObject as mo +from fc_network_adapter import mavlinkDevice as md + +# ====================== 分割線 ===================== + +test_item = 21 +running_time = 30 + + +print('test_item : ', test_item) + +''' +測試項 個位數 表示分離的功能 + +測試項 1X 表示 mavlink_object 的功能 測試連線的能力 +''' + +if test_item == 10: + # 需要開啟一個 ardupilot 的模擬器 + # 測試 mavlink_object 放入 ring buffer 的應用 + print('===> Start of Program .Test ', test_item) + + # 清空 ring buffer + mo.stream_bridge_ring.clear() + mo.return_packet_ring.clear() + + manager = mo.async_io_manager() + manager.start() + time.sleep(0.5) # 等待事件循環啟動 + + # 初始化輸入通道 + connection_string="udp:127.0.0.1:14560" + mavlink_socket1 = mavutil.mavlink_connection(connection_string) + mavlink_object1 = mo.mavlink_object(mavlink_socket1) + + manager.add_mavlink_object(mavlink_object1) + + start_time = time.time() + while (time.time() - start_time) < running_time: + items_a = mo.stream_bridge_ring.get_all() + items_b = mo.return_packet_ring.get_all() + try: + print(f"data num {len(items_a)} in return_packet_ring, first data : {items_a[0][2]}") + except IndexError: + print("stream_bridge_ring is empty") + + try: + print(f"data num {len(items_b)} in return_packet_ring, first data : {items_b[0][2]}") + except IndexError: + print("return_packet_ring is empty") + time.sleep(1) + + manager.stop() + + print('<=== End of Program') + +elif test_item == 11: + # 需要開啟一個 ardupilot 的模擬器 + # 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況 + print('===> Start of Program .Test ', test_item) + + # 清空 ring buffer + mo.stream_bridge_ring.clear() + mo.return_packet_ring.clear() + + manager = mo.async_io_manager() + manager.start() + time.sleep(0.5) # 等待事件循環啟動 + + # 初始化輸入通道 + connection_string="udp:127.0.0.1:14560" + mavlink_socket1 = mavutil.mavlink_connection(connection_string) + mavlink_object1 = mo.mavlink_object(mavlink_socket1) + manager.add_mavlink_object(mavlink_object1) + + # 啟動 mavlink_bridge + analyzer = mo.mavlink_bridge() + + time.sleep(3) + + # 印出目前所有 mavlink_systems 的內容 + print('目前所有的系統 : ') + for sysid in analyzer.mavlink_systems: + print(analyzer.mavlink_systems[sysid]) + + start_time = time.time() + show_time = time.time() + while time.time() - start_time < running_time: + if (time.time() - show_time) >= 2: + # print("mark B") + + show_time = time.time() + for sysid in analyzer.mavlink_systems: + for compid in analyzer.mavlink_systems[sysid].components: + print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count)) + analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) + print("===================") + + manager.stop() + analyzer.stop() + + print('<=== End of Program') + +elif test_item == 12: + # 需要開啟一個 ardupilot 的模擬器 與 GCS + # 這邊是測試 mavlink object 作為交換器功能的代碼 + print('===> Start of Program .Test ', test_item) + + # 清空 ring buffer + mo.stream_bridge_ring.clear() + mo.return_packet_ring.clear() + + manager = mo.async_io_manager() + manager.start() + time.sleep(0.5) # 等待事件循環啟動 + + # 初始化輸入通道 + connection_string="udp:127.0.0.1:14560" + mavlink_socket_in1 = mavutil.mavlink_connection(connection_string) + mavlink_object_in1 = mo.mavlink_object(mavlink_socket_in1) + + connection_string="udp:127.0.0.1:14561" + mavlink_socket_in2 = mavutil.mavlink_connection(connection_string) + mavlink_object_in2 = mo.mavlink_object(mavlink_socket_in2) + + # 初始化輸出通道 + connection_string="udpout:127.0.0.1:14550" + mavlink_socket_out = mavutil.mavlink_connection(connection_string) + mavlink_object_out = mo.mavlink_object(mavlink_socket_out) + + manager.add_mavlink_object(mavlink_object_out) + manager.add_mavlink_object(mavlink_object_in1) + manager.add_mavlink_object(mavlink_object_in2) + + mavlink_object_in1.add_target_socket(mavlink_object_out.socket_id) + mavlink_object_out.add_target_socket(mavlink_object_in1.socket_id) + + mavlink_object_in2.add_target_socket(mavlink_object_out.socket_id) + mavlink_object_out.add_target_socket(mavlink_object_in2.socket_id) + + start_time = time.time() + while (time.time() - start_time) < running_time: + + time.sleep(1) + + manager.stop() + + print('<=== End of Program') + + +# elif test_item == 20: +# # 這邊測試 node 生成 topic 的功能 +# # 利用 空的通道 發出假的 heartbeat 封包 +# print('===> Start of Program .Test ', test_item) +# rclpy.init() # 注意要初始化 rclpy 才能使用 node + +# from pymavlink.dialects.v20 import common as mavlink2 + +# sysid = 1 +# compid = 1 + +# # 啟動 mavlink_bridge +# analyzer = mo.mavlink_bridge() + +# mav = mavlink2.MAVLink(None) +# mav.srcSystem = sysid +# mav.srcComponent = compid + +# # 這是一個 heartbeat 封包 +# fake_heartbeat = mavlink2.MAVLink_heartbeat_message( +# type=mavlink2.MAV_TYPE_QUADROTOR, +# autopilot=mavlink2.MAV_AUTOPILOT_ARDUPILOTMEGA, +# base_mode=3, +# custom_mode=0, +# system_status=0, +# mavlink_version=3 +# ) +# mavlink_bytes = fake_heartbeat.pack(mav) +# fake_heartbeat_msg = mav.parse_char(mavlink_bytes) + +# print(analyzer.mavlink_systems) + +# mo.fixed_stream_bridge_queue.put((0, 0, fake_heartbeat_msg)) # socket id, timestamp, data +# time.sleep(0.1) +# print(analyzer.mavlink_systems) + +# # ROS2 初始化 +# analyzer._init_node() + +# # 創建 ROS2 topic +# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # sysid, compid +# print("topic created") +# time.sleep(5) + +# # 丟出 topic +# analyzer.emit_info() + +# # 結束程式 +# analyzer.running = False +# analyzer.destroy_node() + +# rclpy.shutdown() +# analyzer.stop() +# analyzer.thread.join() +# print('<=== End of Program') + +elif test_item == 21: + # 需要開啟一個 ardupilot 的模擬器 + # 這邊是測試代碼 引入 rclpy 來測試 node 的運行 + + print('===> Start of Program .Test ', test_item) + # 初始化 rclpy 才能使用 node + rclpy.init() + + # 清空 ring buffer + mo.stream_bridge_ring.clear() + mo.return_packet_ring.clear() + + manager = mo.async_io_manager() + manager.start() + + # 啟動 mavlink_bridge + analyzer = mo.mavlink_bridge() + # 關於 Node 的初始化 + show_time = time.time() + analyzer._init_node() # 初始化 node + print('初始化 node 完成 耗時 : ',time.time() - show_time) + + time.sleep(0.5) # 系統 Setup 完成 + + + # 創建通道 + connection_string="udp:127.0.0.1:14560" + mavlink_socket = mavutil.mavlink_connection(connection_string) + mavlink_object3 = mo.mavlink_object(mavlink_socket) + manager.add_mavlink_object(mavlink_object3) + + print('=== waiting for mavlink data ...') + time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 + + print('目前所有的系統 : ') + for sysid in analyzer.mavlink_systems: + print(analyzer.mavlink_systems[sysid]) + + compid = 1 + sysid = 1 + start_time = time.time() + analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) + end_time = time.time() + print(f"Execution time for create_flightMode: {end_time - start_time} seconds") + + print("start emit info") + + start_time = time.time() + show_time = time.time() + while time.time() - start_time < running_time: + try: + # print(analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode']) + analyzer.emit_info() # 這邊是測試 node 的運行 + time.sleep(1) + except KeyboardInterrupt: + break + + + # 程式結束 + analyzer.destroy_node() + rclpy.shutdown() + + # 結束程式 退出所有 thread + manager.stop() + analyzer.stop() + analyzer.thread.join() + + mavlink_socket.close() + print('<=== End of Program') + +# elif test_item == 22: +# # 需要開啟一個 ardupilot 的模擬器 與 GCS +# # 這邊是測試代碼 引入 rclpy 來測試 node 的運行 +# print('===> Start of Program .Test ', test_item) +# rclpy.init() # 注意要初始化 rclpy 才能使用 node + +# # 啟動 mavlink_bridge +# analyzer = mo.mavlink_bridge() +# # 關於 Node 的初始化 +# show_time = time.time() +# analyzer._init_node() # 初始化 node +# print('初始化 node 完成 耗時 : ',time.time() - show_time) + +# # 初始化兩個通道 +# connection_string_in="udp:127.0.0.1:15551" +# mavlink_socket_in = mavutil.mavlink_connection(connection_string_in) +# mavlink_object_in = mo.mavlink_object(mavlink_socket_in) +# mavlink_object_in.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147] + + +# connection_string_out="udpout:127.0.0.1:14553" +# mavlink_socket_out = mavutil.mavlink_connection(connection_string_out) +# mavlink_object_out = mo.mavlink_object(mavlink_socket_out) +# mavlink_object_out.multiplexingToAnalysis = [] + + + +# # 讓兩個通道互相傳輸 +# mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all +# mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all + +# # 啟動通道 +# mavlink_object_in.run() +# mavlink_object_out.run() + +# print('waiting for mavlink data ...') +# time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 + +# print('目前所有的系統 : ') +# for sysid in analyzer.mavlink_systems: +# print(analyzer.mavlink_systems[sysid]) + +# compid = 1 +# sysid = 1 +# show_time = time.time() +# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) +# print(f"Execution time for create_flightMode: {time.time() - show_time} seconds") + +# print("start emit info") + +# start_time = time.time() +# show_time = time.time() +# show_time2 = time.time() + +# while time.time() - start_time < running_time: +# if (time.time() - show_time2) >= 1: +# analyzer.emit_info() # 這邊是測試 node 的運行 +# # sss = analyzer.mavlink_systems[1].components[1].emitParams['flightMode_mode'] +# fmsg = analyzer.mavlink_systems[1].components[1].emitParams['flightMode'] +# sss = mavutil.mode_string_v10(fmsg) +# # print("目前的飛行模式 : {}, Msg Seq : {}".format(sss, fmsg.get_seq())) +# print("目前的飛行模式 : {}".format(sss)) +# show_time2 = time.time() + +# # if (time.time() - show_time) >= 2: +# # show_time = time.time() +# # for sysid in analyzer.mavlink_systems: +# # for compid in analyzer.mavlink_systems[sysid].components: +# # print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count)) +# # analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) +# # print("===================") + + + +# analyzer.destroy_node() +# rclpy.shutdown() + + +# # 結束程式 退出所有 thread +# mavlink_object_in.stop() +# mavlink_object_in.thread.join() +# mavlink_socket_in.close() +# mavlink_object_out.stop() +# mavlink_object_out.thread.join() +# mavlink_socket_out.close() +# analyzer.stop() +# analyzer.thread.join() + + +# print('<=== End of Program') + +# elif test_item == 51: + +# # 晉凱的測試項目 +# print('===> Start of Program .Test ', test_item) +# rclpy.init() # 注意要初始化 rclpy 才能使用 node + +# # 啟動 mavlink_bridge +# analyzer = mo.mavlink_bridge() +# # 關於 Node 的初始化 +# show_time = time.time() +# analyzer._init_node() # 初始化 node +# print('初始化 node 完成 耗時 : ',time.time() - show_time) + +# # 創建通道 +# connection_string="udp:127.0.0.1:15551" +# mavlink_socket3 = mavutil.mavlink_connection(connection_string) +# mavlink_object3 = mo.mavlink_object(mavlink_socket3) +# # 設定通道流動 +# mavlink_object3.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147] +# mavlink_object3.multiplexingToReturn = [] # +# # mavlink_object3.multiplexingToSwap = [] # +# # 啟動通道 +# mavlink_object3.run() + + + +# print('waiting for mavlink data ...') +# time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 + +# compid = 1 +# sysid = 1 +# start_time = time.time() +# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) +# analyzer.create_attitude(sysid, analyzer.mavlink_systems[sysid].components[compid]) +# analyzer.create_local_position_pose(sysid, analyzer.mavlink_systems[sysid].components[compid]) +# analyzer.create_local_position_velocity(sysid, analyzer.mavlink_systems[sysid].components[compid]) +# analyzer.create_global_global(sysid, analyzer.mavlink_systems[sysid].components[compid]) +# analyzer.create_vfr_hud(sysid, analyzer.mavlink_systems[sysid].components[compid]) +# analyzer.create_battery(sysid, analyzer.mavlink_systems[sysid].components[compid]) +# analyzer.create_global_rel(sysid, analyzer.mavlink_systems[sysid].components[compid]) +# end_time = time.time() +# print(f"Execution time for create all topic: {end_time - start_time} seconds") + +# print("start emit info") + +# start_time = time.time() +# show_time = time.time() +# while time.time() - start_time < running_time: +# try: +# # rclpy.spin(analyzer) +# analyzer.emit_info() # 這邊是測試 node 的運行 +# time.sleep(0.5) +# except KeyboardInterrupt: +# break + +# analyzer.destroy_node() +# rclpy.shutdown() + +# # 結束程式 退出所有 thread +# mavlink_object3.stop() +# mavlink_object3.thread.join() +# analyzer.stop() +# analyzer.thread.join() + +# mavlink_socket3.close() +# print('<=== End of Program') \ No newline at end of file diff --git a/src/fc_network_adapter/tests/test_asyncioManager.py b/src/fc_network_adapter/tests/test_asyncioManager.py deleted file mode 100644 index 27a790a..0000000 --- a/src/fc_network_adapter/tests/test_asyncioManager.py +++ /dev/null @@ -1,246 +0,0 @@ -import os -import sys -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) - -import time -import threading -import socket -import random -from pymavlink import mavutil -from pymavlink.dialects.v20 import common as mavlink2 - -from fc_network_adapter.asyncioManager import AsyncIOManager, Connection - -class MockMAVLinkSocket: - """模擬 pymavlink socket 行為的類,用於測試""" - - def __init__(self, name="mock", simulate_errors=False): - self.name = name - self.simulate_errors = simulate_errors - self.is_closed = False - self.received_msgs = [] - self.seq = 0 - - def recv_msg(self): - """模擬接收 MAVLink 消息""" - if self.is_closed: - return None - - # 隨機模擬網絡延遲 - time.sleep(random.uniform(0.01, 0.05)) - - # 偶爾模擬錯誤 - if self.simulate_errors and random.random() < 0.1: - raise Exception("Simulated network error") - - # 70% 的機率返回消息,30% 返回 None (模擬無數據可讀) - if random.random() > 0.3: - msg = self._generate_mavlink_message() - self.received_msgs.append(msg) - return msg - - return None - - def _generate_mavlink_message(self): - """生成模擬 MAVLink 消息""" - # 創建 MAVLink 實例 - mav = mavlink2.MAVLink(self) - - # 設置來源系統和組件 - mav.srcSystem = random.randint(1, 5) - mav.srcComponent = random.randint(1, 5) - - # 創建 HEARTBEAT 消息 - msg = mavlink2.MAVLink_heartbeat_message( - type=mavlink2.MAV_TYPE_QUADROTOR, - autopilot=mavlink2.MAV_AUTOPILOT_GENERIC, - base_mode=0, - custom_mode=0, - system_status=0, - mavlink_version=3 - ) - - # 打包消息 - msg.pack(mav) - self.seq += 1 - return msg - - def write(self, data): - """模擬發送數據""" - if self.is_closed: - raise Exception("Socket is closed") - - # 模擬寫入操作 - time.sleep(random.uniform(0.001, 0.01)) - - # 偶爾模擬錯誤 - if self.simulate_errors and random.random() < 0.1: - raise Exception("Simulated write error") - - return len(data) - - def close(self): - """關閉連接""" - self.is_closed = True - -def create_mock_mavlink_sockets(count=3): - """創建多個模擬 MAVLink socket""" - sockets = [] - for i in range(count): - # 每三個 socket 中有一個模擬錯誤 - simulate_errors = (i % 3 == 0) - socket = MockMAVLinkSocket(f"mock_{i}", simulate_errors) - sockets.append(socket) - return sockets - -def monitor_thread(io_manager): - """監視 AsyncIOManager 狀態的執行緒""" - print("Starting monitoring thread...") - - try: - while True: - # 獲取總體統計資訊 - stats = io_manager.get_stats() - print(f"\n=== AsyncIOManager Stats ===") - print(f"Total connections: {stats.get('connection_count', 0)}") - print(f"Total messages received: {stats.get('rx_count', 0)}") - print(f"Total bytes received: {stats.get('rx_bytes', 0)}") - print(f"Total errors: {stats.get('errors', 0)}") - - # 獲取每個連接的緩衝區狀態 - for socket_id in io_manager.connections: - buffer = io_manager.get_buffer(socket_id) - conn_stats = io_manager.get_stats(socket_id) - - if buffer and not io_manager.connections[socket_id].is_closed: - print(f"Connection {socket_id}: Buffer size={buffer.size()}, " + - f"Messages={conn_stats.get('rx_count', 0)}, " + - f"Last activity={time.time() - io_manager.connections[socket_id].last_activity:.1f}s ago") - - # 每秒更新一次 - time.sleep(1.0) - except KeyboardInterrupt: - print("Monitoring thread stopped.") - -def consumer_thread(io_manager, socket_id): - """從指定連接的緩衝區消費消息""" - print(f"Starting consumer for socket {socket_id}...") - - try: - while True: - # 獲取緩衝區 - buffer = io_manager.get_buffer(socket_id) - if not buffer: - print(f"Consumer {socket_id}: Buffer not found or connection closed") - break - - # 獲取所有消息 - messages = buffer.get_all() - if messages: - print(f"Consumer {socket_id}: Received {len(messages)} messages") - for msg in messages: - # 處理每條消息 (這裡只是示例,實際應用中可能有更複雜的處理) - print(f" - Message from SysID={msg.get_srcSystem()}, CompID={msg.get_srcComponent()}, Type={msg.get_type()}") - - # 短暫休眠,避免 CPU 空轉 - time.sleep(0.2) - except Exception as e: - print(f"Consumer {socket_id} stopped: {e}") - -def send_command_thread(io_manager, socket_ids): - """定期向所有連接發送命令""" - print("Starting command sender thread...") - - # 創建一個 MAVLink 實例用於生成消息 - mav = mavlink2.MAVLink(None) - mav.srcSystem = 255 # GCS system ID - mav.srcComponent = 0 - - try: - count = 0 - while True: - # 等待 2 秒 - time.sleep(2.0) - count += 1 - - # 創建一個命令消息 - msg = mavlink2.MAVLink_command_long_message( - target_system=1, - target_component=1, - command=mavlink2.MAV_CMD_REQUEST_PROTOCOL_VERSION, - confirmation=0, - param1=0, param2=0, param3=0, - param4=0, param5=0, param6=0, param7=0 - ) - - # 向所有連接發送消息 - for socket_id in socket_ids: - if socket_id in io_manager.connections and not io_manager.connections[socket_id].is_closed: - try: - # 打包並發送消息 - result = io_manager.send_message(socket_id, msg) - print(f"Sent command to socket {socket_id}: {'Success' if result else 'Failed'}") - except Exception as e: - print(f"Error sending to socket {socket_id}: {e}") - - # 每 10 次迭代關閉一個連接 (用於演示關閉功能) - if count % 10 == 0 and socket_ids: - socket_id = socket_ids.pop(0) - print(f"Closing connection {socket_id}...") - io_manager.remove_connection(socket_id) - except Exception as e: - print(f"Command sender thread stopped: {e}") - -def main(): - print("AsyncIOManager Example") - print("======================") - - # 創建 AsyncIOManager 實例 - io_manager = AsyncIOManager() - - # 啟動 I/O 管理器 - io_manager.start() - print("AsyncIOManager started") - - # 創建模擬 MAVLink socket - mock_sockets = create_mock_mavlink_sockets(5) - socket_ids = [] - - # 註冊連接 - for i, socket in enumerate(mock_sockets): - socket_id = io_manager.register_connection(socket) - socket_ids.append(socket_id) - print(f"Registered socket {i} with ID {socket_id}") - - # 啟動監視執行緒 - monitor = threading.Thread(target=monitor_thread, args=(io_manager,)) - monitor.daemon = True - monitor.start() - - # 為每個連接啟動消費者執行緒 - consumers = [] - for socket_id in socket_ids: - consumer = threading.Thread(target=consumer_thread, args=(io_manager, socket_id)) - consumer.daemon = True - consumer.start() - consumers.append(consumer) - - # 啟動命令發送執行緒 - sender = threading.Thread(target=send_command_thread, args=(io_manager, socket_ids.copy())) - sender.daemon = True - sender.start() - - print("\nPress Ctrl+C to exit...") - try: - # 運行一段時間然後退出 - time.sleep(30) - except KeyboardInterrupt: - pass - finally: - # 清理資源 - print("\nShutting down...") - io_manager.stop() - print("AsyncIOManager stopped") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/src/fc_network_adapter/tests/test_mavlinkObject.py b/src/fc_network_adapter/tests/test_mavlinkObject.py new file mode 100644 index 0000000..23907ee --- /dev/null +++ b/src/fc_network_adapter/tests/test_mavlinkObject.py @@ -0,0 +1,468 @@ +#!/usr/bin/env python +""" +測試腳本,用於測試 mavlinkObject.py 中的 mavlink_object 和 async_io_manager 類別 +""" + +import os +import sys +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + + +import unittest +import time +import threading +import socket +import asyncio +from unittest.mock import MagicMock, patch + +# 導入要測試的模組 +from fc_network_adapter.mavlinkObject import ( + mavlink_object, + async_io_manager, + MavlinkObjectState, + stream_bridge_ring, + return_packet_ring +) + + +class MockMavlinkSocket: + """模擬 Mavlink Socket 的類別,用於測試""" + + def __init__(self, test_data=None): + self.closed = False + self.test_data = test_data or [] + self.data_index = 0 + self.written_data = [] + + def recv_msg(self): + if not self.test_data or self.data_index >= len(self.test_data): + return None + + data = self.test_data[self.data_index] + self.data_index += 1 + return data + + def write(self, data): + self.written_data.append(data) + + def close(self): + self.closed = True + + +class MockMavlinkMessage: + """模擬 Mavlink 訊息的類別,用於測試""" + + def __init__(self, msg_id, sysid, compid, seq=0): + self.msg_id = msg_id + self.sysid = sysid + self.compid = compid + self.seq = seq + self.msg_buf = bytes([msg_id, sysid, compid, seq]) + + def get_msgId(self): + return self.msg_id + + def get_srcSystem(self): + return self.sysid + + def get_srcComponent(self): + return self.compid + + def get_seq(self): + return self.seq + + def get_msgbuf(self): + return self.msg_buf + + def get_type(self): + return f"MSG_ID_{self.msg_id}" + + +class TestMavlinkObject(unittest.TestCase): + """測試 mavlink_object 類別""" + + def setUp(self): + """在每個測試方法執行前準備環境""" + # 清空全局變數 + mavlink_object.mavlinkObjects = {} + mavlink_object.socket_num = 0 + + # 清空 ring buffer + stream_bridge_ring.clear() + return_packet_ring.clear() + + # 創建測試訊息 + self.heartbeat_msg = MockMavlinkMessage(msg_id=0, sysid=1, compid=1) + self.attitude_msg = MockMavlinkMessage(msg_id=30, sysid=1, compid=1) + self.position_msg = MockMavlinkMessage(msg_id=32, sysid=1, compid=1) + + # 創建模擬的 socket # 假的 Mavlink Socket + self.mock_socket = MockMavlinkSocket([ + self.heartbeat_msg, + self.attitude_msg, + self.position_msg + ]) + + # 創建測試對象 + self.mavlink_obj = mavlink_object(self.mock_socket) + + def test_initialization(self): + """測試 mavlink_object 初始化是否正確""" + # print("Testing mavlink_object initialization") + self.assertEqual(self.mavlink_obj.socket_id, 0) + self.assertEqual(self.mavlink_obj.state, MavlinkObjectState.INIT) + self.assertEqual(len(self.mavlink_obj.target_sockets), 0) + self.assertEqual(self.mavlink_obj.bridge_msg_types, [0]) + + def test_add_remove_target_socket(self): + """測試添加和移除目標端口功能""" + # 添加目標端口 + self.assertTrue(self.mavlink_obj.add_target_socket(1)) + self.assertEqual(len(self.mavlink_obj.target_sockets), 1) + self.assertEqual(self.mavlink_obj.target_sockets[0], 1) + self.assertTrue(self.mavlink_obj.add_target_socket(2)) + self.assertEqual(len(self.mavlink_obj.target_sockets), 2) + self.assertIn(2, self.mavlink_obj.target_sockets) + + # 嘗試添加已存在的端口 + self.assertFalse(self.mavlink_obj.add_target_socket(1)) + self.assertEqual(len(self.mavlink_obj.target_sockets), 2) + + # 嘗試添加自己的端口 + self.assertFalse(self.mavlink_obj.add_target_socket(0)) + self.assertEqual(len(self.mavlink_obj.target_sockets), 2) + + # 移除端口 + self.assertTrue(self.mavlink_obj.remove_target_socket(2)) + self.assertEqual(len(self.mavlink_obj.target_sockets), 1) + + # 嘗試移除不存在的端口 + self.assertFalse(self.mavlink_obj.remove_target_socket(2)) + + def test_set_message_types(self): + """測試設置訊息類型功能""" + # 設置橋接器訊息類型 + self.assertTrue(self.mavlink_obj.set_bridge_message_types([0, 30])) + self.assertEqual(self.mavlink_obj.bridge_msg_types, [0, 30]) + + # 設置回傳處理器訊息類型 + self.assertTrue(self.mavlink_obj.set_return_message_types([32])) + self.assertEqual(self.mavlink_obj.return_msg_types, [32]) + + # 測試無效的訊息類型 + self.assertFalse(self.mavlink_obj.set_bridge_message_types("invalid")) + self.assertFalse(self.mavlink_obj.set_return_message_types([0, "invalid"])) + + def test_send_message(self): + """測試 send_message 功能""" + # 創建一個新的 mavlink_object 實例 + mock_socket = MockMavlinkSocket() + mavlink_obj = mavlink_object(mock_socket) + + # 準備測試數據 + test_message1 = b"test_message_1" + test_message2 = b"test_message_2" + + # 測試初始狀態 + self.assertEqual(len(mock_socket.written_data), 0) + + # 測試非運行狀態下發送消息 + self.assertFalse(mavlink_obj.send_message(test_message1)) + self.assertEqual(len(mock_socket.written_data), 0) + + # 啟動 manager + manager = async_io_manager() + manager.start() + time.sleep(0.5) # 等待事件循環啟動 + + # 添加對象到 manager + manager.add_mavlink_object(mavlink_obj) + time.sleep(0.1) # 等待對象啟動 + + # 確認對象狀態 + self.assertEqual(mavlink_obj.state, MavlinkObjectState.RUNNING) + + # 測試發送消息 + self.assertTrue(mavlink_obj.send_message(test_message1)) + time.sleep(0.2) # 等待消息處理 + + # 確認消息已發送 + self.assertEqual(len(mock_socket.written_data), 1) + self.assertEqual(mock_socket.written_data[0], test_message1) + + # 測試連續發送多條消息 + self.assertTrue(mavlink_obj.send_message(test_message2)) + time.sleep(0.2) # 等待消息處理 + + # 確認兩條消息都已發送 + self.assertEqual(len(mock_socket.written_data), 2) + self.assertEqual(mock_socket.written_data[1], test_message2) + + # 模擬發送出錯的情況 + class ErrorWriteSocket(MockMavlinkSocket): + def write(self, data): + raise Exception("Write error") + + error_socket = ErrorWriteSocket() + error_obj = mavlink_object(error_socket) + manager.add_mavlink_object(error_obj) + time.sleep(0.1) # 等待對象啟動 + + # 發送消息到錯誤的 socket + self.assertTrue(error_obj.send_message(test_message1)) + time.sleep(0.2) # 等待消息處理 + + # 即使寫入失敗,send_message 應該也返回 True + # 因為消息已成功加入到佇列中,只是後續的實際發送失敗 + + # 停止 manager + manager.stop() + time.sleep(0.5) # 等待 manager 停止 + + # 測試對象已關閉後發送消息 + self.assertFalse(mavlink_obj.send_message(test_message1)) + self.assertEqual(len(mock_socket.written_data), 2) # 消息數量未增加 + +class TestAsyncIOManager(unittest.TestCase): + """測試 async_io_manager 類別""" + + def setUp(self): + """在每個測試方法執行前準備環境""" + # 清空全局變數 + mavlink_object.mavlinkObjects = {} + mavlink_object.socket_num = 0 + + # 清空 ring buffer + stream_bridge_ring.clear() + return_packet_ring.clear() + + # 創建 async_io_manager 實例 + self.manager = async_io_manager() + + # 模擬 mavlink 對象 + self.mock_socket1 = MockMavlinkSocket([ + MockMavlinkMessage(msg_id=0, sysid=1, compid=1), + MockMavlinkMessage(msg_id=30, sysid=1, compid=1) + ]) + self.mock_socket2 = MockMavlinkSocket([ + MockMavlinkMessage(msg_id=0, sysid=2, compid=1) + ]) + + self.mavlink_obj1 = mavlink_object(self.mock_socket1) + self.mavlink_obj2 = mavlink_object(self.mock_socket2) + + def tearDown(self): + """在每個測試方法執行後清理環境""" + if self.manager.running: + self.manager.stop() + + def test_singleton_pattern(self): + """測試 async_io_manager 的單例模式 就是不會產生兩個 magager""" + manager1 = async_io_manager() + manager2 = async_io_manager() + self.assertIs(manager1, manager2) + + def test_start_stop(self): + """測試 async_io_manager 的啟動和停止功能""" + # 啟動管理器 + self.manager.start() + self.assertTrue(self.manager.running) + self.assertIsNotNone(self.manager.thread) + + # 再次啟動應該沒有效果 + old_thread = self.manager.thread + self.manager.start() + self.assertIs(self.manager.thread, old_thread) + + # 停止管理器 + self.manager.stop() + self.assertFalse(self.manager.running) + + # 最多等待 5 秒讓線程結束 + start_time = time.time() + while self.manager.thread.is_alive() and time.time() - start_time < 5: + time.sleep(0.1) + self.assertFalse(self.manager.thread.is_alive()) + + def test_add_remove_objects(self): + """測試添加和移除 mavlink_object""" + # 啟動管理器 + self.manager.start() + time.sleep(0.5) # 等待事件循環啟動 + + # 添加對象 + self.assertTrue(self.manager.add_mavlink_object(self.mavlink_obj1)) + self.assertEqual(len(self.manager.managed_objects), 1) + self.assertEqual(self.mavlink_obj1.state, MavlinkObjectState.RUNNING) + + # 添加另一個對象 + self.assertTrue(self.manager.add_mavlink_object(self.mavlink_obj2)) + self.assertEqual(len(self.manager.managed_objects), 2) + + # 檢查受管理對象列表 + managed_objects = self.manager.get_managed_objects() + self.assertEqual(len(managed_objects), 2) + self.assertIn(0, managed_objects) + self.assertIn(1, managed_objects) + + # 移除對象 + self.assertTrue(self.manager.remove_mavlink_object(0)) + self.assertEqual(len(self.manager.managed_objects), 1) + + # 嘗試移除不存在的對象 + self.assertFalse(self.manager.remove_mavlink_object(999)) + + # 停止管理器 + self.manager.stop() + + def test_data_processing(self): + """測試數據處理""" + # 創建用於轉發的 mavlink_objects + mock_socket3 = MockMavlinkSocket() + mavlink_obj3 = mavlink_object(mock_socket3) + + # 設置轉發 + self.mavlink_obj1.add_target_socket(2) # socket1 轉發到 socket3 + + # 設置訊息類型 + self.mavlink_obj1.set_bridge_message_types([0, 30]) # HEARTBEAT, ATTITUDE + # self.mavlink_obj1.enable_return(True) + self.mavlink_obj1.set_return_message_types([30]) # ATTITUDE + + # 啟動管理器並添加對象 + self.manager.start() + time.sleep(0.5) # 等待事件循環啟動 + + self.manager.add_mavlink_object(self.mavlink_obj1) + self.manager.add_mavlink_object(mavlink_obj3) + + # 等待處理完成 + time.sleep(1.0) + + # print("testing Mark A") + + # 檢查 Ring buffer 是否有正確的數據 + self.assertEqual(stream_bridge_ring.size(), 2) # HEARTBEAT + ATTITUDE + self.assertEqual(return_packet_ring.size(), 1) # ATTITUDE + + a = stream_bridge_ring.get() + print(f"stream_bridge_ring: {a}") + + # 檢查是否正確轉發 + self.assertEqual(len(mock_socket3.written_data), 2) # HEARTBEAT + ATTITUDE + + # print("testing Mark B") + + # 停止管理器 + self.manager.stop() + + def test_error_handling(self): + """測試錯誤處理情況""" + print("=== mark A ===") + # 創建一個會引發異常的 socket + class ErrorSocket(MockMavlinkSocket): + def recv_msg(self): + raise Exception("Test exception") + + error_socket = ErrorSocket() + mavlink_obj_err = mavlink_object(error_socket) + + # 啟動管理器並添加對象 + self.manager.start() + time.sleep(0.5) # 等待事件循環啟動 + + self.manager.add_mavlink_object(mavlink_obj_err) + + print("=== mark B ===") + + # 等待錯誤處理 + time.sleep(1.0) + + # 對象應該進入錯誤狀態,但不會崩潰 + # self.assertEqual(mavlink_obj_err.state, MavlinkObjectState.ERROR) + + print("=== mark C ===") + + # 管理器應該仍在運行 + self.assertTrue(self.manager.running) + + + # 故意等一段時間 確認 socket 有被 manager 處理掉 + time.sleep(3) + + # 停止管理器 + self.manager.stop() + + +class TestIntegration(unittest.TestCase): + """整合測試,測試多個 mavlink_object 之間的互動""" + + def test_bidirectional_forwarding(self): + """測試雙向轉發""" + # 清空全局變數和 ring buffer + mavlink_object.mavlinkObjects = {} + mavlink_object.socket_num = 0 + stream_bridge_ring.clear() + return_packet_ring.clear() + + # 創建三個 mavlink 對象,模擬三個通道 + socket1 = MockMavlinkSocket([ + MockMavlinkMessage(msg_id=0, sysid=1, compid=1), + MockMavlinkMessage(msg_id=30, sysid=1, compid=1) + ]) + socket2 = MockMavlinkSocket([ + MockMavlinkMessage(msg_id=0, sysid=2, compid=1), + MockMavlinkMessage(msg_id=32, sysid=2, compid=1) + ]) + socket3 = MockMavlinkSocket([ + MockMavlinkMessage(msg_id=0, sysid=3, compid=1), + MockMavlinkMessage(msg_id=33, sysid=3, compid=1) + ]) + + obj1 = mavlink_object(socket1) + obj2 = mavlink_object(socket2) + obj3 = mavlink_object(socket3) + + # 設置雙向轉發 + # obj1 <-> obj2 <-> obj3 + obj1.add_target_socket(1) # obj1 -> obj2 + obj2.add_target_socket(0) # obj2 -> obj1 + obj2.add_target_socket(2) # obj2 -> obj3 + obj3.add_target_socket(1) # obj3 -> obj2 + + # 啟動 async_io_manager + manager = async_io_manager() + manager.start() + time.sleep(0.5) # 等待事件循環啟動 + + # 添加所有 mavlink_object + manager.add_mavlink_object(obj1) + manager.add_mavlink_object(obj2) + manager.add_mavlink_object(obj3) + + # 等待處理所有訊息 + time.sleep(1.5) + + # 檢查轉發結果 + # socket1 應該收到 socket2 和 socket3 的訊息 + self.assertEqual(len(socket1.written_data), 4) # 2 from obj2, 2 from obj3 via obj2 + + # socket2 應該收到 socket1 和 socket3 的訊息 + self.assertEqual(len(socket2.written_data), 4) # 2 from obj1, 2 from obj3 + + # socket3 應該收到 socket1 和 socket2 的訊息 + self.assertEqual(len(socket3.written_data), 4) # 2 from obj1 via obj2, 2 from obj2 + + # 檢查 ring buffer 的數據 + # 假設所有對象都啟用了橋接器,且預設的 bridge_msg_types = [0] + # 應該有 3 個 HEARTBEAT 訊息 + self.assertEqual(stream_bridge_ring.size(), 3) # 3 HEARTBEAT + + # 停止管理器 + manager.stop() + + + + +if __name__ == "__main__": + unittest.main(defaultTest="TestMavlinkObject.test_send_message")