diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py index f84ce60..3421a0d 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py @@ -10,7 +10,6 @@ import time ''' - # 不同的匯流排只有單一種通訊協定 # 匯流排接到訊息後透過 queue stack 傳送到分析器 # 會有三種 Queue 分別作為 1. 固定串流分析器 2. 回傳封包處理器 3. 轉格式(例如 mavlink to FDM)的轉換器 @@ -21,13 +20,14 @@ import time # pymavlink 的 socket 會由其他地方製作(例如 main) 再放到 mavlinkObject 裡面 # 這邊的 main 會是用來初始化 mavlinkObject 並啟動他的 run function - ''' fixed_stream_analyzer_queue = queue.Queue() return_packet_processor_queue = queue.Queue() converte_format_queues = [] +mavlink_systems = {} # 用來記錄每個 system 的資訊 + class MavlinkObject(): def __init__(self, mavlink_socket, socket_id = None): self.socket_id = socket_id @@ -35,18 +35,24 @@ class MavlinkObject(): self.msg_count = {} self.multiplexingList = [] + # 關聯到全域變數 + global mavlink_systems + self.mavlink_systems = mavlink_systems + + # 這三個 list 用來分配不同的訊息到不同的 queue self.multiplexingToAnalysis = [ - # 0, # HEARTBEAT + 0, # HEARTBEAT + 24, # GPS_RAW_INT 30, # ATTITUDE 33, # GLOBAL_POSITION_INT - # 147, # BATTERY_STATUS + 74, # VFR_HUD + 147, # BATTERY_STATUS ] self.multiplexingToReturn = [ 0, # HEARTBEAT # 30, # ATTITUDE ] self.multiplexingToConvert = [ - [74, ] # VFR_HUD ] def __del__(self): self.mavlink_socket.close() @@ -66,6 +72,7 @@ class MavlinkObject(): def _run_thread(self): start_time = time.time() while self.running: + timestamp = time.time() # 記錄接受到訊息的時間 # 這邊若是在大量訊息造成壅塞時 可能會有些微偏差 try: mavlinkMsg = self.mavlink_socket.recv_msg() except Exception as e: @@ -75,7 +82,13 @@ class MavlinkObject(): break if mavlinkMsg: - self.count_mavlink_type(mavlinkMsg) # 統計收到的訊息 + # 統計收到的訊息 & 透過 mavlink 封包的序列號來檢查是否有遺失的封包 & 記錄最後收到的封包時間 + sysid = mavlinkMsg.get_srcSystem() + compid = mavlinkMsg.get_srcComponent() + if sysid in self.mavlink_systems: # 只有當這個 system id 已經透過 HEARTBEAT 訊號被初始化過 才會記錄相關訊息 + # mavlinkMsg.get_type() # mavlinkMsg.get_msgId() # 前者是字串 後者是 int 都是代表 mavlink 類型 + mavlink_systems[sysid].updateComponentPacketCount(compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp) + # print(mavlinkMsg) # data type 'pymavlink.dialects.v20.ardupilotmega.MAVLink_attitude_message' # debug # break # Debug @@ -83,115 +96,313 @@ class MavlinkObject(): for i in range(len(self.multiplexingList)): if mavlinkMsg.get_msgId() in self.multiplexingList[i]: if i == 0: - fixed_stream_analyzer_queue.put((self.socket_id,mavlinkMsg)) + fixed_stream_analyzer_queue.put((self.socket_id,timestamp,mavlinkMsg)) elif i == 1: - return_packet_processor_queue.put((self.socket_id,mavlinkMsg)) + return_packet_processor_queue.put((self.socket_id,timestamp,mavlinkMsg)) else: convert_queue = converte_format_queues[i-2] - convert_queue.put((self.socket_id,mavlinkMsg)) - - # 每秒中 統計一次 收到的訊息總量 - elapsed_time = time.time() - start_time - if elapsed_time > 1: - print('[mavlinkObject.py] Messages Type received (in about 1 sec) :') - print(self.msg_count) - start_time = time.time() - self.msg_count = {} - - # thread 結束 - print('[mavlinkObject.py] End of _run_thread') + convert_queue.put((self.socket_id,timestamp,mavlinkMsg)) - def count_mavlink_type(self, mavlinkMsg): - msg_type = mavlinkMsg.get_type() - if msg_type in self.msg_count: - self.msg_count[msg_type] += 1 - else: - self.msg_count[msg_type] = 1 - - def publish_mavlink_data(self, mavlinkMsg): - msg = String() - msg.data = str(mavlinkMsg) - self.publisher_.publish(msg) + # thread 結束 + print('[mavlinkObject.py] End of MavlinkObject._run_thread') def updateMultiplexingList(self): + ''' + 更新 multiplexing list 並做簡單的檢查 + ''' + # 更新 multiplexing list self.multiplexingList = [self.multiplexingToAnalysis, self.multiplexingToReturn] + self.multiplexingToConvert + # 檢查 multiplexing list 格式是否有錯誤 check = all(isinstance(i, list) and all(isinstance(j, int) for j in i) for i in self.multiplexingList) - - # 若有錯誤則回傳 False if not check: - print('[mavlinkObject.py] Multiplexing List Error, Please check the list') + print('[mavlinkObject.py] Multiplexing List Format Error, Please check the list') + return False # 若有錯誤則回傳 False + + check = len(self.multiplexingToConvert) != len(converte_format_queues) + if check: + print('[mavlinkObject.py] Multiplexing Queue not fit List , Please check the list') return False return True +# ===================== 分割線 ===================================== + class mavlink_analyzer(Node): ''' - 這個 class 是用來接收 mavlink 訊息 並進行分析 + 這個 class 就是 固定串流分析器 + 是用來接收 mavlink 訊息 並進行分析 + 這個地方是針對 fixed_stream_analyzer_queue 的資料做處理的 記錄有 mavlink bus 上有那些 system id 和 component id 為了每個 system id 都有一個對應的 device object 並且看是否有重複 system id - 也許可以用 node 去創建 這樣就可以直接進到 ros2 執行緒? + 整段代碼包含兩大區塊 thread 和 node + + thread 區塊內會對 fixed_stream_analyzer_queue 進行監聽 並且將收到的訊息進行處理 + 其中 HEARTBEAT 是一個特殊類別 用來初始化整個 device object + + node 區塊則是處理 ros2 的 publisher 和 subscriber 訂閱相關 + 藉由控制 ros2 的機制再把 device object 的資訊發送出去 + + ps. 我限制了這個 class 只能有一個 instance + ''' + _instance = None + _lock = threading.Lock() # 確保多線程安全 + + def __new__(cls, *args, **kwargs): + with cls._lock: # 確保多線程環境下只有一個實例被創建 + if cls._instance is None: + cls._instance = super(mavlink_analyzer, cls).__new__(cls) + return cls._instance def __init__(self): - super().__init__('mavlink_analyzer') - - # self.publisher_ = self.create_publisher(String, 'mavlink_data', 10) - # self.mavlink_object = None - # self.create_mavlink_object() + if not hasattr(self, "initialized"): # 防止重複初始化 + self.initialized = True + rclpy.init() + super().__init__('mavlink_analyzer') + + # 關聯到全域變數 + global mavlink_systems + self.mavlink_systems = mavlink_systems + + # 當 object 建立時會直接運行 thread 直到消滅 + self.running = True + self.thread = threading.Thread(target=self._run_thread) + self.thread.start() + else: + print('[mavlinkObject.py] WARNING : mavlink_analyzer instance already exists') + def stop(self): + self.running = False - def create_mavlink_object(self): - # connection_string="udp: + # === Thread 區塊 === + def _run_thread(self): + start_time = time.time() # debug + # 從 Queue fixed_stream_analyzer_queue 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理 + while self.running: + if fixed_stream_analyzer_queue.empty(): + continue + msg_pack = fixed_stream_analyzer_queue.get() + + msg = msg_pack[2] + sysid = msg.get_srcSystem() + + if msg.get_msgId() == 0: # HEARTBEAT 處理 + + # 若這個 system id 還不存在 執行完整建立 device object 的流程 + if not sysid in self.mavlink_systems: + device_object = mavlink_device() # 創建一個新的 device object + self.mavlink_systems[sysid] = device_object + device_object.socket_id = msg_pack[0] + device_object.sysid = sysid + + this_component = device_object.mavlink_component() # 創建一個新的 component object + device_object.components[msg.get_srcComponent()] = this_component + this_component.mav_type = msg.type + this_component.mav_autopilot = msg.autopilot + + + # 初始化 emitParams 方便之後更新與利用 + this_component.emitParams.add_parameter('base_mode') + this_component.emitParams.add_parameter('flight_mode') + + # TODO 這邊先不過度設計 但是也許之後要依照不同的載具類型再定義 + # for BATTERY_STATUS (147) + this_component.emitParams.add_parameter('battery_voltage') + # for GPS_RAW_INT (24) + this_component.emitParams.add_parameter('gps_fix_type') + # for ATTITUDE (30) + this_component.emitParams.add_parameter('roll') + this_component.emitParams.add_parameter('pitch') + this_component.emitParams.add_parameter('yaw') + # for GLOBAL_POSITION_INT (33) + this_component.emitParams.add_parameter('latitude') + this_component.emitParams.add_parameter('longitude') + this_component.emitParams.add_parameter('altitude') + # for VFR_HUD (74) + this_component.emitParams.add_parameter('heading') + + this_component.emitParams.base_mode = msg.base_mode + this_component.emitParams.flight_mode = mavutil.mode_string_v10(msg) + + elif msg.get_msgId() == 147: # BATTERY_STATUS 處理 + this_component = self.mavlink_systems[sysid].components[msg.get_srcComponent()] + + + # 若未定義的訊息類型則不處理 並跳出訊息 + else: + print('[mavlinkObject.py] Warning This Message Type Did not define process method : ',msg.get_msgId(), get_type()) + continue + + + print('[mavlinkObject.py] End of mavlink_analyzer._run_thread') + + # === Node 區塊 === + def _init_node(self): pass + # self.publisher = self.create_publisher(String, 'mavlink_analyzer', 10) + # self.subscription = self.create_subscription(String, 'mavlink_analyzer', self.listener_callback, 10) + + def emit_info(self): + # 這邊將 mavlink_systems 內所有的 device object 內所有的 component 輪循過 + # 把 emitParams 的參數發送出去 + for sysid in self.mavlink_systems: + for compid in self.mavlink_systems[sysid].components: + + # 先抓 base_mode 驗證看看算法是否正確 + msg_str = self.mavlink_systems[sysid].components[compid].base_mode + self.publisher.publish(msg_str) + # self.get_logger().info('Publishing: "%s"' % msg.data) + +# 這個 class 是用來記錄每個 system 的資訊 每個 system 可能會有多個 component class mavlink_device(): def __init__(self): self.socket_id = None # 記錄來自於哪個 socket self.sysid = None - self.component_count = 1 - self.compid_list = [] - + self.components = {} # 用來記錄每個 component 的資訊 key 是 compid, value 是 component object + + def __str__(self): + p_str = '' + p_str += f'socket_id : {self.socket_id}\n' + p_str += f'sysid : {self.sysid}\n' + p_str += 'has components : \n' + for compid in self.components: + p_str += f'compid : {compid}\n' + p_str += f'mav_type : {self.components[compid].mav_type}\n' + p_str += '=====================\n' + return p_str + + def updateComponentPacketCount(self, compid, current_seq, current_type, current_time): + # 這段檢查遺失封包 + try: + last_seq = self.components[compid].msg_seq + except KeyError: + # 這個 component id 還不存在 + print('[mavlinkObject.py] Warning System ID : {} This Component ID : {} Did not init yet'.format(self.sysid, compid)) + return + if last_seq != None: + expected_seq = (last_seq + 1) % 256 + diff = current_seq - expected_seq + # print("current last exp diff : ",current_seq, last_seq, expected_seq, diff) # debug + if diff < 0: + diff += 256 + self.components[compid].lost_packet_count += diff + + # 這段更新封包的基本資訊 + self.components[compid].msg_seq = current_seq + self.components[compid].last_msg_time = current_time + if current_type in self.components[compid].msg_count: + self.components[compid].msg_count[current_type] += 1 + else: + self.components[compid].msg_count[current_type] = 1 + + def resetComponentPacketCount(self, compid): + self.components[compid].msg_count = {} + self.components[compid].msg_seq = None + self.components[compid].lost_packet_count = 0 + + class mavlink_component(): + # 這個 class 用來存放該 component 參數,然而這些參數是固定以一個頻率要輸出給 ros2 topic + class emitParamSet(): + def __init__(self): + pass + def add_parameter(self, param_name): + setattr(self, param_name, None) + + def __init__(self): + self.mav_type = None # 表示 Vehicle 或 component type (例如: 旋翼機是2, 雲台是26, GCS是6) + self.mav_autopilot = None # 表示 autopilot type (例如: ardupilot是3, px4是12) + self.mav_system_status = None # 表示 system status (例如: active是3, standby是4) + + # 紀錄從這個 component 收到最後的訊息序號和時間 + self.msg_count = {} + self.msg_seq = None + self.lost_packet_count = 0 + self.last_msg_time = 0 + + # 存放該 emit component 參數的區域 + self.emitParams = self.emitParamSet() def main(args=None): + # print('Start of Program .Test 1') + # # 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來 + # connection_string="udp:127.0.0.1:14550" + # mavlink_socket = mavutil.mavlink_connection(connection_string) + + # mavlink_object = MavlinkObject(mavlink_socket, 1) + # mavlink_object.multiplexingToAnalysis = [30] # only ATTITUDE + # mavlink_object.multiplexingToReturn = [0] # only HEARTBEAT + # mavlink_object.multiplexingToConvert = [[74,]] # only VFR_HUD + + # # 做一個空的 queue list 驗證 multiplexingToConvert + # converte_format_queues.append(queue.Queue()) + # # 啟動連線的模組 + # mavlink_object.run() + + # # 運行幾秒並印出 queue 的資料 + # start_time = time.time() + # while time.time() - start_time < 2: + + # while not fixed_stream_analyzer_queue.empty(): + # print('fixed_stream_analyzer_queue:') + # t = fixed_stream_analyzer_queue.get() + # print('from {} : {}'.format(t[0],t[2])) + # while not return_packet_processor_queue.empty(): + # print('return_packet_processor_queue:') + # t = return_packet_processor_queue.get() + # # print('from {} : {}'.format(t[0],t[2])) + # print(t[2].type) + + # for q in converte_format_queues: + # while not q.empty(): + # print('converte_format_queues:') + # t = q.get() + # print('from {} : {}'.format(t[0],t[2])) + + # # 結束程式 退出所有 thread + # mavlink_object.running = False + # mavlink_object.thread.join() + # mavlink_socket.close() + # print('End of Program .Test 1') + + # # # ======================================================================== + print('Start of Program .Test 2') connection_string="udp:127.0.0.1:14550" mavlink_socket = mavutil.mavlink_connection(connection_string) - # 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來 mavlink_object = MavlinkObject(mavlink_socket, 1) - mavlink_object.multiplexingToAnalysis = [30] # only ATTITUDE - mavlink_object.multiplexingToReturn = [0] # only HEARTBEAT - mavlink_object.multiplexingToConvert = [[74,]] # only VFR_HUD - - converte_format_queues.append(queue.Queue()) + mavlink_object.multiplexingToAnalysis = [0] # only HEARTBEAT + mavlink_object.multiplexingToReturn = [] # only HEARTBEAT + mavlink_object.multiplexingToConvert = [] # + # 啟動連線的模組 mavlink_object.run() - start_time = time.time() - while time.time() - start_time < 2: - - while not fixed_stream_analyzer_queue.empty(): - print('fixed_stream_analyzer_queue:') - t = fixed_stream_analyzer_queue.get() - print('from {} : {}'.format(t[0],t[1])) - while not return_packet_processor_queue.empty(): - print('return_packet_processor_queue:') - t = return_packet_processor_queue.get() - print('from {} : {}'.format(t[0],t[1])) - - for q in converte_format_queues: - while not q.empty(): - print('converte_format_queues:') - t = q.get() - print('from {} : {}'.format(t[0],t[1])) - + # 啟動 mavlink_analyzer + analyzer = mavlink_analyzer() + compid = 1 - mavlink_object.running = False + # 運行幾秒 + start_time = time.time() + show_time = time.time() + while time.time() - start_time < 10: + if (time.time() - show_time) >= 1: + show_time = time.time() + for sysid in analyzer.mavlink_systems: + print("目前收到的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].msg_count) + print("目前遺失的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].lost_packet_count) + print("現在飛機的模式 : ",analyzer.mavlink_systems[sysid].components[compid].emitParams.flight_mode) + analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) + + # 結束程式 退出所有 thread + mavlink_object.stop() mavlink_object.thread.join() - print('End of Program') - + analyzer.stop() + analyzer = mavlink_analyzer() # 這邊是測試是否只有一個 instance + mavlink_socket.close() if __name__ == '__main__': main() \ No newline at end of file diff --git a/src/unitdev02/unitdev02/test01.py b/src/unitdev02/unitdev02/test01.py index c1b5276..e00d946 100644 --- a/src/unitdev02/unitdev02/test01.py +++ b/src/unitdev02/unitdev02/test01.py @@ -205,6 +205,21 @@ def mavlink_btye_generator_test(): custom_mode=0, system_status=mavutil.mavlink.MAV_STATE_ACTIVE ) + msg = mav.command_long_encode( + target_system=1, + target_component=1, + command=mavutil.mavlink.MAV_CMD_DO_SET_MODE, + confirmation=0, + param1=0, # Custom mode + param2=0, # Custom sub-mode + param3=0, + param4=0, + param5=0, + param6=0, + param7=0 + ) + + # 取得 MAVLink 訊息的 bytes mavlink_bytes = msg.pack(mav) @@ -219,6 +234,19 @@ def mavlink_btye_generator_test(): print(dir(mav2)) # print(my_msg) +def simple_getMavlink(): + connection_string="udp:127.0.0.1:14550" + # connection_string='/dev/ttyUSB0' + connection = mavutil.mavlink_connection(connection_string) + + while True: + msg = connection.recv_msg() + if msg: + print(msg) + else: + print('No message yet, 0.1 second for data to fill') + sleep(0.1) + print('Start') @@ -226,5 +254,6 @@ print('Start') # fdm_parser_example() # mavlink_analyzer_simple(8) # mavlink_btye_generator_test() +simple_getMavlink()