完成固定串流分析器的基礎結構並連結到 node topic

更嚴格的模組化 分析器 & socket adapter(mavlink object) & device 之間
程式可測試進入點為 mavlinkObject.py  依開發進度提供三個測試項目
新增 logger 功能 記錄執行狀況
chiyu
Chiyu Chen 1 year ago
parent b6c730f74a
commit bdd6cceb96

3
.gitignore vendored

@ -8,7 +8,8 @@
# 編譯的資料夾
**/build/
**/install/
log/
**/log/
**/logs/
# CMAKE 的衍生檔
CMakeCache.txt

@ -6,9 +6,13 @@
2. Python
3. Ardupilot
===
必要相依套件
ROS2 預啟動
1. source ~/ros2_humble/install/setup.bash
2.
===
建議的開發測試
@ -27,4 +31,3 @@ Package 簡述
同時處理與 Gazebo 的 ardupilot_plugin 溝通的 FDM/JSON 訊息
處理無線模組的通訊格式 (XBee)

@ -0,0 +1,84 @@
from theLogger import setup_logger
logger = setup_logger("mavlinkDevice.py")
# 這個 class 是用來記錄每個 system 的資訊 每個 system 可能會有多個 component
class mavlink_device():
def __init__(self):
self.socket_id = None # 記錄來自於哪個 socket
self.sysid = None
self.components = {} # 用來記錄每個 component 的資訊 key 是 compid, value 是 component object
def __str__(self):
p_str = ''
p_str += f'socket_id : {self.socket_id}\n'
p_str += f'sysid : {self.sysid}\n'
p_str += 'has components : \n'
for compid in self.components:
p_str += f'compid : {compid}\n'
p_str += f'mav_type : {self.components[compid].mav_type}\n'
p_str += '=====================\n'
return p_str
def updateComponentPacketCount(self, compid, current_seq, current_type, current_time):
# 這段檢查遺失封包
try:
last_seq = self.components[compid].msg_seq
except KeyError:
# 這個 component id 還不存在
logger.error('System ID : {} This Component ID : {} Did not init yet'.format(self.sysid, compid))
return
if last_seq != None:
expected_seq = (last_seq + 1) % 256
diff = current_seq - expected_seq
# print("current last exp diff : ",current_seq, last_seq, expected_seq, diff) # debug
if diff < 0:
diff += 256
self.components[compid].lost_packet_count += diff
# 這段更新封包的基本資訊
self.components[compid].msg_seq = current_seq
self.components[compid].last_msg_time = current_time
if current_type in self.components[compid].msg_count:
self.components[compid].msg_count[current_type] += 1
else:
self.components[compid].msg_count[current_type] = 1
def resetComponentPacketCount(self, compid):
self.components[compid].msg_count = {}
self.components[compid].msg_seq = None
self.components[compid].lost_packet_count = 0
class mavlink_component():
# 程式用不到的參數 但是做個記錄
# paraEmitList = ['base_mode', 'flightMode_mode',
# 'battery_voltage', # from BATTERY_STATUS (147)
# 'gps_fix_type', # from GPS_RAW_INT (24)
# 'roll', 'pitch', 'yaw', # from ATTITUDE (30)
# 'position_latitude', 'position_longitude', 'position_altitude', # from GLOBAL_POSITION_INT (33)
# 'heading', # for VFR_HUD (74)
# ]
def __init__(self):
self.mav_type = None # 表示 Vehicle 或 component type (例如: 旋翼機是2, 雲台是26, GCS是6)
self.mav_autopilot = None # 表示 autopilot type (例如: ardupilot是3, px4是12)
self.mav_system_status = None # 表示 system status (例如: active是3, standby是4)
# 紀錄從這個 component 收到最後的訊息序號和時間
self.msg_count = {}
self.msg_seq = None
self.lost_packet_count = 0
self.last_msg_time = 0
# 存放該 emit component 參數的區域
# 內容格式為 {param_name(字串) : param_value}
# param_name 請見上面 paraEmitList
self.emitParams = {}
# 用來存放每個 topic 的 publisher
# 內容格式 為 {topic_name(字串) : [publisher(物件), method(函式)]} (?
# 內容格式 為 {publisher(物件) : method(函式)} (?
# 還在測試哪個比較好
self.publishers = {}

@ -1,14 +1,3 @@
import threading
from pymavlink import mavutil
import queue
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import time
'''
# 不同的匯流排只有單一種通訊協定
# 匯流排接到訊息後透過 queue stack 傳送到分析器
@ -17,18 +6,41 @@ import time
# 分析器會將解析得到的結論 再透過 ros2 的 publisher 發送出去
# 關於 mavlink 採用 pymavlink 這個套件 製作匯流排
# pymavlink 的 socket 會由其他地方製作(例如 main) 再放到 mavlinkObject 裡面
# 這邊的 main 會是用來初始化 mavlinkObject 並啟動他的 run function
# pymavlink 的 socket 會由其他地方製作(例如 main) 再放到 mavlink_object 裡面
# 這邊的 main 會是用來初始化 mavlink_object 並啟動他的 run function
'''
# 基礎功能的 import
import threading
import queue
import time
# mavlink 的 import
from pymavlink import mavutil
# ROS2 的 import
from rclpy.node import Node
import std_msgs.msg
# 自定義的 import
from mavlinkDevice import mavlink_device
from mavlinkPublish import mavlink_publisher
from theLogger import setup_logger
# ====================== 分割線 =====================
logger = setup_logger("mavlinkObject.py")
fixed_stream_analyzer_queue = queue.Queue()
return_packet_processor_queue = queue.Queue()
converte_format_queues = []
mavlink_systems = {} # 用來記錄每個 system 的資訊
# 用來記錄每個 system 的資訊
# 資料格式 { sysid : mavlink_device object }
mavlink_systems = {}
class MavlinkObject():
class mavlink_object():
def __init__(self, mavlink_socket, socket_id = None):
self.socket_id = socket_id
self.mavlink_socket = mavlink_socket
@ -60,7 +72,7 @@ class MavlinkObject():
def run(self):
if not self.socket_id:
print('[mavlinkObject.py] Please set socket id before running')
logger.error('Please set socket id before running')
return
self.thread = threading.Thread(target=self._run_thread)
self.running = self.updateMultiplexingList()
@ -70,18 +82,20 @@ class MavlinkObject():
self.running = False
def _run_thread(self):
logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id))
start_time = time.time()
while self.running:
timestamp = time.time() # 記錄接受到訊息的時間 # 這邊若是在大量訊息造成壅塞時 可能會有些微偏差
try:
mavlinkMsg = self.mavlink_socket.recv_msg()
except Exception as e:
logger.critical(f"Receiving data not mavlink format. Object Delete.")
print(f"[mavlinkObject.py] Error receiving mavlink data: {e}")
print(mavlinkMsg)
self.running = False
break
if mavlinkMsg:
if mavlinkMsg: # data type should be 'pymavlink.dialects.v20.ardupilotmega.MAVLink_attitude_message'
# 統計收到的訊息 & 透過 mavlink 封包的序列號來檢查是否有遺失的封包 & 記錄最後收到的封包時間
sysid = mavlinkMsg.get_srcSystem()
compid = mavlinkMsg.get_srcComponent()
@ -89,7 +103,7 @@ class MavlinkObject():
# mavlinkMsg.get_type() # mavlinkMsg.get_msgId() # 前者是字串 後者是 int 都是代表 mavlink 類型
mavlink_systems[sysid].updateComponentPacketCount(compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp)
# print(mavlinkMsg) # data type 'pymavlink.dialects.v20.ardupilotmega.MAVLink_attitude_message' # debug
# print(mavlinkMsg) # debug
# break # Debug
# 將訊息依照 multiplexing list 分發到不同的 queue
@ -104,7 +118,7 @@ class MavlinkObject():
convert_queue.put((self.socket_id,timestamp,mavlinkMsg))
# thread 結束
print('[mavlinkObject.py] End of MavlinkObject._run_thread')
logger.info('End of mavlink_object._run_thread id : {}'.format(self.socket_id))
def updateMultiplexingList(self):
'''
@ -117,18 +131,18 @@ class MavlinkObject():
# 檢查 multiplexing list 格式是否有錯誤
check = all(isinstance(i, list) and all(isinstance(j, int) for j in i) for i in self.multiplexingList)
if not check:
print('[mavlinkObject.py] Multiplexing List Format Error, Please check the list')
logger.error('Multiplexing List Format Error, Please check the list')
return False # 若有錯誤則回傳 False
check = len(self.multiplexingToConvert) != len(converte_format_queues)
if check:
print('[mavlinkObject.py] Multiplexing Queue not fit List , Please check the list')
logger.error('Multiplexing Queue not fit List , Please check the list')
return False
return True
# ===================== 分割線 =====================================
# ===================== 分割線 =====================
class mavlink_analyzer(Node):
class mavlink_analyzer(Node, mavlink_publisher):
'''
這個 class 就是 固定串流分析器
是用來接收 mavlink 訊息 並進行分析
@ -159,8 +173,6 @@ class mavlink_analyzer(Node):
def __init__(self):
if not hasattr(self, "initialized"): # 防止重複初始化
self.initialized = True
rclpy.init()
super().__init__('mavlink_analyzer')
# 關聯到全域變數
global mavlink_systems
@ -171,13 +183,14 @@ class mavlink_analyzer(Node):
self.thread = threading.Thread(target=self._run_thread)
self.thread.start()
else:
print('[mavlinkObject.py] WARNING : mavlink_analyzer instance already exists')
logger.error('mavlink_analyzer instance already exists. Do not create another one.')
def stop(self):
self.running = False
# === Thread 區塊 ===
def _run_thread(self):
start_time = time.time() # debug
# start_time = time.time() # debug
logger.info('Start of mavlink_analyzer._run_thread')
# 從 Queue fixed_stream_analyzer_queue 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理
while self.running:
if fixed_stream_analyzer_queue.empty():
@ -187,6 +200,8 @@ class mavlink_analyzer(Node):
msg = msg_pack[2]
sysid = msg.get_srcSystem()
# ↓↓↓↓↓↓↓↓↓↓↓↓ 處理不同訊息類型的功能寫在這裡 請加在這個 elif 之內 ↓↓↓↓↓↓↓↓↓↓↓↓
if msg.get_msgId() == 0: # HEARTBEAT 處理
# 若這個 system id 還不存在 執行完整建立 device object 的流程
@ -201,208 +216,199 @@ class mavlink_analyzer(Node):
this_component.mav_type = msg.type
this_component.mav_autopilot = msg.autopilot
# 初始化 emitParams 方便之後更新與利用
this_component.emitParams.add_parameter('base_mode')
this_component.emitParams.add_parameter('flight_mode')
# TODO 這邊先不過度設計 但是也許之後要依照不同的載具類型再定義
# for BATTERY_STATUS (147)
this_component.emitParams.add_parameter('battery_voltage')
# for GPS_RAW_INT (24)
this_component.emitParams.add_parameter('gps_fix_type')
# for ATTITUDE (30)
this_component.emitParams.add_parameter('roll')
this_component.emitParams.add_parameter('pitch')
this_component.emitParams.add_parameter('yaw')
# for GLOBAL_POSITION_INT (33)
this_component.emitParams.add_parameter('latitude')
this_component.emitParams.add_parameter('longitude')
this_component.emitParams.add_parameter('altitude')
# for VFR_HUD (74)
this_component.emitParams.add_parameter('heading')
this_component.emitParams.base_mode = msg.base_mode
this_component.emitParams.flight_mode = mavutil.mode_string_v10(msg)
this_component.emitParams['base_mode'] = msg.base_mode
this_component.emitParams['flightMode_mode'] = mavutil.mode_string_v10(msg)
elif msg.get_msgId() == 147: # BATTERY_STATUS 處理
this_component = self.mavlink_systems[sysid].components[msg.get_srcComponent()]
# ↑↑↑↑↑↑↑↑↑↑↑↑ 處理不同訊息類型的功能寫在這裡 請加在這個 elif 之內 ↑↑↑↑↑↑↑↑↑↑↑↑
# 若未定義的訊息類型則不處理 並跳出訊息
else:
print('[mavlinkObject.py] Warning This Message Type Did not define process method : ',msg.get_msgId(), get_type())
logger.warning('This Message Type Did not define process method : {} / {}'.format(msg.get_msgId(), msg.get_type()))
continue
print('[mavlinkObject.py] End of mavlink_analyzer._run_thread')
logger.info('End of mavlink_analyzer._run_thread')
# === Node 區塊 ===
def _init_node(self):
pass
# self.publisher = self.create_publisher(String, 'mavlink_analyzer', 10)
# self.subscription = self.create_subscription(String, 'mavlink_analyzer', self.listener_callback, 10)
logger.info('Start of mavlink_analyzer._init_node')
super().__init__('mavlink_analyzer') # TODO 不知道為何 這句耗時超長 可以到 5~10 秒
def emit_info(self):
# 這邊將 mavlink_systems 內所有的 device object 內所有的 component 輪循過
# 把 emitParams 的參數發送出去
for sysid in self.mavlink_systems:
for compid in self.mavlink_systems[sysid].components:
# 先抓 base_mode 驗證看看算法是否正確
msg_str = self.mavlink_systems[sysid].components[compid].base_mode
self.publisher.publish(msg_str)
# self.get_logger().info('Publishing: "%s"' % msg.data)
# 這個 class 是用來記錄每個 system 的資訊 每個 system 可能會有多個 component
class mavlink_device():
def __init__(self):
self.socket_id = None # 記錄來自於哪個 socket
self.sysid = None
self.components = {} # 用來記錄每個 component 的資訊 key 是 compid, value 是 component object
def __str__(self):
p_str = ''
p_str += f'socket_id : {self.socket_id}\n'
p_str += f'sysid : {self.sysid}\n'
p_str += 'has components : \n'
for compid in self.components:
p_str += f'compid : {compid}\n'
p_str += f'mav_type : {self.components[compid].mav_type}\n'
p_str += '=====================\n'
return p_str
def updateComponentPacketCount(self, compid, current_seq, current_type, current_time):
# 這段檢查遺失封包
try:
last_seq = self.components[compid].msg_seq
except KeyError:
# 這個 component id 還不存在
print('[mavlinkObject.py] Warning System ID : {} This Component ID : {} Did not init yet'.format(self.sysid, compid))
return
if last_seq != None:
expected_seq = (last_seq + 1) % 256
diff = current_seq - expected_seq
# print("current last exp diff : ",current_seq, last_seq, expected_seq, diff) # debug
if diff < 0:
diff += 256
self.components[compid].lost_packet_count += diff
# 這段更新封包的基本資訊
self.components[compid].msg_seq = current_seq
self.components[compid].last_msg_time = current_time
if current_type in self.components[compid].msg_count:
self.components[compid].msg_count[current_type] += 1
else:
self.components[compid].msg_count[current_type] = 1
def resetComponentPacketCount(self, compid):
self.components[compid].msg_count = {}
self.components[compid].msg_seq = None
self.components[compid].lost_packet_count = 0
class mavlink_component():
# 這個 class 用來存放該 component 參數,然而這些參數是固定以一個頻率要輸出給 ros2 topic
class emitParamSet():
def __init__(self):
for sysid, device in self.mavlink_systems.items():
for compid, component in device.components.items():
for topic_name in component.publishers.keys():
publisher = component.publishers[topic_name][0]
packEmit_func = component.publishers[topic_name][1]
packEmit_func(component.emitParams, publisher)
def _del_node(self):
# TODO 這邊要刪除 node 的時候要做的事情
# 先註銷所有 mavlink_systems 中 component 的 publisher
# 再註銷所有 mavlink_systems 中的 device object
# 再註銷 node
pass
def add_parameter(self, param_name):
setattr(self, param_name, None)
def __init__(self):
self.mav_type = None # 表示 Vehicle 或 component type (例如: 旋翼機是2, 雲台是26, GCS是6)
self.mav_autopilot = None # 表示 autopilot type (例如: ardupilot是3, px4是12)
self.mav_system_status = None # 表示 system status (例如: active是3, standby是4)
# 紀錄從這個 component 收到最後的訊息序號和時間
self.msg_count = {}
self.msg_seq = None
self.lost_packet_count = 0
self.last_msg_time = 0
# 存放該 emit component 參數的區域
self.emitParams = self.emitParamSet()
def main(args=None):
# print('Start of Program .Test 1')
# # 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來
# connection_string="udp:127.0.0.1:14550"
# mavlink_socket = mavutil.mavlink_connection(connection_string)
test_item = 2
print('test_item : ', test_item)
# mavlink_object = MavlinkObject(mavlink_socket, 1)
# mavlink_object.multiplexingToAnalysis = [30] # only ATTITUDE
# mavlink_object.multiplexingToReturn = [0] # only HEARTBEAT
# mavlink_object.multiplexingToConvert = [[74,]] # only VFR_HUD
if test_item == 1:
# 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來
# 只啟用了 mavlink_object 的功能
print('Start of Program .Test 1')
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
# # 做一個空的 queue list 驗證 multiplexingToConvert
# converte_format_queues.append(queue.Queue())
# # 啟動連線的模組
# mavlink_object.run()
mavlink_object1 = mavlink_object(mavlink_socket, 1)
mavlink_object1.multiplexingToAnalysis = [30] # only ATTITUDE
mavlink_object1.multiplexingToReturn = [0] # only HEARTBEAT
mavlink_object1.multiplexingToConvert = [[74,]] # only VFR_HUD
# # 運行幾秒並印出 queue 的資料
# start_time = time.time()
# while time.time() - start_time < 2:
# 做一個空的 queue list 驗證 multiplexingToConvert
converte_format_queues.append(queue.Queue())
# 啟動連線的模組
mavlink_object1.run()
# while not fixed_stream_analyzer_queue.empty():
# print('fixed_stream_analyzer_queue:')
# t = fixed_stream_analyzer_queue.get()
# print('from {} : {}'.format(t[0],t[2]))
# while not return_packet_processor_queue.empty():
# print('return_packet_processor_queue:')
# t = return_packet_processor_queue.get()
# # print('from {} : {}'.format(t[0],t[2]))
# print(t[2].type)
# for q in converte_format_queues:
# while not q.empty():
# print('converte_format_queues:')
# t = q.get()
# 運行幾秒並印出 queue 的資料
start_time = time.time()
while time.time() - start_time < 2:
while not fixed_stream_analyzer_queue.empty():
print('fixed_stream_analyzer_queue:')
t = fixed_stream_analyzer_queue.get()
print('from {} : {}'.format(t[0],t[2]))
while not return_packet_processor_queue.empty():
print('return_packet_processor_queue:')
t = return_packet_processor_queue.get()
# print('from {} : {}'.format(t[0],t[2]))
print(t[2].type)
for q in converte_format_queues:
while not q.empty():
print('converte_format_queues:')
t = q.get()
print('from {} : {}'.format(t[0],t[2]))
# # 結束程式 退出所有 thread
# mavlink_object.running = False
# mavlink_object.thread.join()
# mavlink_socket.close()
# print('End of Program .Test 1')
# 結束程式 退出所有 thread
mavlink_object1.running = False
mavlink_object1.thread.join()
mavlink_socket.close()
print('End of Program .Test 1')
# # # ========================================================================
elif test_item == 2:
# 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況
# 啟用 mavlink_object 與 mavlink_analyzer的thread區塊 的功能
print('Start of Program .Test 2')
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object = MavlinkObject(mavlink_socket, 1)
mavlink_object.multiplexingToAnalysis = [0] # only HEARTBEAT
mavlink_object.multiplexingToReturn = [] # only HEARTBEAT
mavlink_object.multiplexingToConvert = [] #
mavlink_object2 = mavlink_object(mavlink_socket, 1)
mavlink_object2.multiplexingToAnalysis = [0] # only HEARTBEAT
mavlink_object2.multiplexingToReturn = [] # only HEARTBEAT
mavlink_object2.multiplexingToConvert = [] #
# 啟動連線的模組
mavlink_object.run()
# 啟動 mavlink_analyzer
analyzer = mavlink_analyzer()
compid = 1
# 運行幾秒
# 啟動連線的模組
mavlink_object2.run()
start_time = time.time()
show_time = time.time()
while time.time() - start_time < 10:
compid = 1
while time.time() - start_time < 5:
if (time.time() - show_time) >= 1:
show_time = time.time()
for sysid in analyzer.mavlink_systems:
print("目前收到的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].msg_count)
print("目前遺失的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].lost_packet_count)
print("現在飛機的模式 : ",analyzer.mavlink_systems[sysid].components[compid].emitParams.flight_mode)
print("現在飛機的模式 : ",analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode'])
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print("===================")
# 印出目前所有 mavlink_systems 的內容
print('目前所有的系統 : ')
for sysid in mavlink_systems:
print(mavlink_systems[sysid])
# 結束程式 退出所有 thread
mavlink_object.stop()
mavlink_object.thread.join()
mavlink_object2.stop()
mavlink_object2.thread.join()
analyzer.stop()
analyzer = mavlink_analyzer() # 這邊是測試是否只有一個 instance
analyzer.thread.join()
mavlink_socket.close()
print('End of Program .Test 2')
elif test_item == 3:
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('Start of Program .Test 3')
rclpy.init() # 注意要初始化 rclpy 才能使用 node
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object3 = mavlink_object(mavlink_socket, 1)
mavlink_object3.multiplexingToAnalysis = [0] # only HEARTBEAT
mavlink_object3.multiplexingToReturn = [] # only HEARTBEAT
mavlink_object3.multiplexingToConvert = [] #
# 啟動 mavlink_analyzer
analyzer = mavlink_analyzer()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
# 啟動連線的模組
mavlink_object3.run()
print('waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
compid = 1
sysid = 1
start_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
end_time = time.time()
print(f"Execution time for create_flightMode: {end_time - start_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
while time.time() - start_time < 20:
try:
# rclpy.spin(analyzer)
analyzer.emit_info() # 這邊是測試 node 的運行
time.sleep(0.5)
except KeyboardInterrupt:
break
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
mavlink_object3.stop()
mavlink_object3.thread.join()
analyzer.stop()
analyzer.thread.join()
mavlink_socket.close()
print('End of Program .Test 3')
if __name__ == '__main__':
import rclpy # 這邊是為了測試而加的
main()

@ -0,0 +1,52 @@
'''
這個檔案只有一個 class
是作為 mavlinkObject.py mavlink_analyzer 類別的功能衍生
主要概念是將 "離散的" mavlink 參數轉換成 ROS topic
包含了創建 publisher 以及包裝並丟到 ros2 topic packEmit
publisher topic name 命名規則為 <前綴詞>/s<sysid>/<具體 topic>
'''
import std_msgs.msg
from theLogger import setup_logger
logger = setup_logger("mavlinkPublish.py")
class mavlink_publisher():
prefix_path = 'MavLinkBus'
def create_flightMode(self, sysid, component_obj):
# target topic name # 請跟這個 method 的名稱保持一致
target_topic = 'flightMode'
# 這邊要檢查 flight_mode 是否存在
try:
_ = component_obj.emitParams['flightMode_mode']
except KeyError:
# 這個 component id 還不存在
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
return False
# 若存在則 建立 publisher object 並回傳 true
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
publisher_ = self.create_publisher(std_msgs.msg.String, topic_name, 1)
component_obj.publishers[target_topic] = [publisher_, self.packEmit_flightMode]
return True
def packEmit_flightMode(cls, emitParams, publisher):
msg_str = emitParams['flightMode_mode']
msg = std_msgs.msg.String()
msg.data = msg_str
publisher.publish(msg)
pass
# ↓↓↓↓↓↓↓↓↓↓↓↓ 處理不同 ros2 topic 訊息 請放在這裡 ↓↓↓↓↓↓↓↓↓↓↓↓
# ↑↑↑↑↑↑↑↑↑↑↑↑ 處理不同 ros2 topic 訊息 請放在這裡 ↑↑↑↑↑↑↑↑↑↑↑↑

@ -0,0 +1,43 @@
import logging
import os
from logging.handlers import TimedRotatingFileHandler
# 全域 Logger 實例
_global_logger = None
def setup_logger(name: str, log_dir: str = "log", level=logging.DEBUG) -> logging.Logger:
global _global_logger
if _global_logger is None:
# 確保 logs 資料夾存在
os.makedirs(log_dir, exist_ok=True)
# 建立全域 Logger
_global_logger = logging.getLogger("global_logger")
_global_logger.setLevel(level)
_global_logger.propagate = False # 防止重複輸出
formatter = logging.Formatter(
fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
datefmt="%m-%d %H:%M:%S"
)
# 檔案輸出(每天輪替)
file_handler = TimedRotatingFileHandler(
filename=os.path.join(log_dir, "application.log"),
when="midnight", # 每天 0 點輪替
backupCount=7, # 保留 7 天
encoding="utf-8"
)
file_handler.setFormatter(formatter)
_global_logger.addHandler(file_handler)
# 終端機輸出
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
_global_logger.addHandler(console_handler)
# 為每個模組建立子 Logger並設定名稱
module_logger = _global_logger.getChild(name)
module_logger.name = name # 修改子 Logger 的名稱,僅保留子 Logger 名稱
return module_logger

File diff suppressed because one or more lines are too long
Loading…
Cancel
Save