(fix) 1. 把 bridge 的種類補上可以跟 ros2 搭配

2. 一些 ros2 service 的基底 仍在開發
chiyu
Chiyu Chen 3 months ago
parent bbd120d25a
commit 3d48b1d9fe

@ -3,8 +3,23 @@
只會挑出重要的變數與方法描述
以利後續開發使用
# 開發此專案的注意事項
- 預設 autopilot 的 component id = 1
- 不允許 system id 重複
- 增加一個固定數值監控然後要到 ros2 topic
- mavlinkROS2Node.py 檔案內
- PublishRateController.topic_intervals 建立
- VehicleStatusPublisher._publish_vehicle_status 登記
- VehicleStatusPublisher._publish_XXX 實作
- mavlinkObject.py 檔案內
- mavlink_bridge.message_handlers 登記
- mavlink_bridge._handle_XXX 實作
- mavlink_object.bridge_msg_types 登記 (這個可以用介面調)
- mavlinkVehicleView.py 檔案內
- 注意對應的資料存放區
---
# 檔案結構
特別注意:
@ -40,7 +55,7 @@
---
關於載具管理與檢視
- *_update_vehicles_list()*
- *_prepare_vehicle_info*
- *_prepare_vehicle_info()*
---
關於 serial_manager 控制實現
- *create_serial_port_object()*
@ -167,8 +182,9 @@
7. 終端機介面控制
8. 基礎載具流量觀測
9. 載具狀態收集與彙整
10. a. ros2 topic 應用開發介面
### 待開發功能
5-1. 建立 serial 連線 並可以對接收器下達AT指令
5-2. 模組化 serial 連線機制 以利後期擴容其他模組
10-1. ros2 應用開發介面
10. a. ros2 應用開發介面

@ -29,7 +29,7 @@ from .utils import acquireSerial, acquirePort
from .utils.acquirePort import find_available_port
logger = setup_logger(os.path.basename(__file__))
VERSION_NO = "v0.58"
VERSION_NO = "v0.59"
class PanelState:
def __init__(self):

@ -367,7 +367,7 @@ class mavlink_object:
self.outgoing_msgs = deque()
# 記錄訊息過濾類型 (可選)
self.bridge_msg_types = set([0, 30]) # 0 HEARTBEAT, 30 ATTITUDE, ...
self.bridge_msg_types = set([0, 30, 33, 74, 147]) # 0 HEARTBEAT, 30 ATTITUDE, 33 GLOBAL_POSITION_INT, 74 VFR_HUD, 147 BATTERY_STATUS
self.return_msg_types = set()
# 轉發到別的 mavlink object 作為目標端口 的列表

@ -39,6 +39,11 @@ class PublishRateController:
"""發布頻率控制器 - 按時間間隔控制發布頻率"""
def __init__(self):
# ═══════════════════════════════════════════════════════════════
# 【新增 Topic 位置 1/4】
# 若要新增 topic 種類,請在此字典中加入新的 topic 名稱和發布間隔
# 例如:'ekf_status': 1.0, # EKF 狀態
# ═══════════════════════════════════════════════════════════════
# 各 topic 的發布間隔(秒)
self.topic_intervals = {
'position': 0.5, # GPS位置
@ -48,6 +53,7 @@ class PublishRateController:
'vfr_hud': 0.5, # VFR HUD
'mode': 1.0, # 飛行模式
'summary': 1.0, # 載具摘要
# 在這裡新增更多 topics...
}
# 記錄每個 topic 的最後發布時間 {(sysid, topic): timestamp}
self.last_publish_time: Dict[tuple, float] = {}
@ -150,6 +156,11 @@ class VehicleStatusPublisher(Node):
status = component.status
# ═══════════════════════════════════════════════════════════════
# 【新增 Topic 位置 2/4】
# 若要新增 topic請在此處調用對應的發布方法
# 例如self._publish_ekf_status(sysid, status)
# ═══════════════════════════════════════════════════════════════
# 發布各種狀態(通過頻率控制器判斷是否發布)
self._publish_position(sysid, status)
self._publish_attitude(sysid, status)
@ -158,6 +169,7 @@ class VehicleStatusPublisher(Node):
self._publish_vfr_hud(sysid, status)
self._publish_mode(sysid, status)
self._publish_summary(vehicle)
# 在這裡新增更多 publish 方法調用...
def _get_or_create_publisher(self, sysid: int, topic: str, msg_type, qos: int = 1):
"""
@ -372,12 +384,12 @@ class VehicleStatusPublisher(Node):
'autopilot': component.mav_autopilot if component.mav_autopilot else 0,
'socket_id': vehicle.custom_meta.get('socket_id', -1), # 重要!
'armed': status.armed if status.armed is not None else False,
'mode_custom': status.mode.custom_mode if status.mode.custom_mode else 0,
# 'mode_custom': status.mode.custom_mode if status.mode.custom_mode else 0,
'mode_name': status.mode.mode_name if status.mode.mode_name else "UNKNOWN",
'latitude': status.position.latitude if status.position.latitude else 0.0,
'longitude': status.position.longitude if status.position.longitude else 0.0,
'altitude': status.position.altitude if status.position.altitude else 0.0,
'battery_percent': status.battery.remaining if status.battery.remaining else 0,
# 'latitude': status.position.latitude if status.position.latitude else 0.0,
# 'longitude': status.position.longitude if status.position.longitude else 0.0,
# 'altitude': status.position.altitude if status.position.altitude else 0.0,
# 'battery_percent': status.battery.remaining if status.battery.remaining else 0,
'gps_fix': status.gps.fix_type if status.gps.fix_type else 0,
'connection_type': vehicle.connected_via.value,
'last_update': component.packet_stats.last_msg_time if component.packet_stats.last_msg_time else 0.0,
@ -387,6 +399,24 @@ class VehicleStatusPublisher(Node):
msg.data = json.dumps(summary)
publisher.publish(msg)
# ═══════════════════════════════════════════════════════════════
# 【新增 Topic 位置 3/4】
# 若要新增 topic請在此處實作對應的發布方法
# 方法命名規則def _publish_<topic_name>(self, sysid: int, status: mvv.ComponentStatus):
# 例如:
# def _publish_ekf_status(self, sysid: int, status: mvv.ComponentStatus):
# """發布 EKF 狀態"""
# if not self.rate_controller.should_publish(sysid, 'ekf_status'):
# return
#
# ekf = status.ekf
# if ekf.flags is None:
# return
#
# publisher = self._get_or_create_publisher(sysid, 'ekf_status', ...
# # ... 實作發布邏輯
# ═══════════════════════════════════════════════════════════════
@staticmethod
def _euler_to_quaternion(roll, pitch, yaw):
"""
@ -430,17 +460,275 @@ class MavlinkCommandService(Node):
- 調用 mavlinkObject 發送封包
- 處理 ACK 等待和超時未來實現
TODO: 稍後實現
設計理念回歸 MAVLink 純粹結構
- 只負責將 ROS2 請求轉換為 MAVLink 封包
- 不預設功能 ARM/DISARM保持模組化
- 高層應用可透過此 service 實現各種功能
"""
def __init__(self):
super().__init__('mavlink_command_service')
logger.info("MavlinkCommandService initialized (not implemented yet)")
# ═══════════════════════════════════════════════════════════════════
# ROS2 Service 架構說明:
#
# 1. Service 定義:由 .srv 檔案定義Request + Response
# - Request: client 發送的請求內容
# - Response: server 回傳的結果
#
# 2. Service Server 創建:
# self.create_service(srv_type, service_name, callback_function)
# - srv_type: service 的訊息類型(需要自定義或使用標準)
# - service_name: service 的名稱client 用此名稱呼叫)
# - callback_function: 處理請求的回調函數
#
# 3. Callback 函數:
# def callback(self, request, response):
# # request: 包含 client 發送的數據
# # response: 需要填充並返回給 client
# return response
#
# 4. Service Client 使用方式(在其他程式中):
# client = node.create_client(srv_type, service_name)
# request = srv_type.Request()
# future = client.call_async(request) # 異步調用
# # 或 response = client.call(request) # 同步調用
# ═══════════════════════════════════════════════════════════════════
# 由於 ROS2 自定義 service 需要 .srv 檔案編譯
# 這裡先使用標準 String service 作為簡化實現
# TODO: 未來可創建專門的 .srv 檔案
from std_srvs.srv import Trigger
from example_interfaces.srv import SetBool
# ═══════════════════════════════════════════════════════════════════
# Service 1: 發送 MAVLink Message通用介面
# 使用 Trigger 作為臨時實現,未來應使用自定義 service
# ═══════════════════════════════════════════════════════════════════
# TODO: 創建 SendMavlinkMessage.srv
# Request:
# uint8 target_sysid
# uint8 target_compid
# uint16 message_id
# string fields_json # JSON 格式的字段數據
# bool wait_response
# uint16 response_msgid
# float32 timeout
# Response:
# bool success
# string response_json
# string error_message
# 暫時使用簡化版本(僅示範架構)
self.srv_send_message = self.create_service(
Trigger,
'/mavlink/send_message',
self.handle_send_message
)
# ═══════════════════════════════════════════════════════════════════
# Service 2: 發送 COMMAND_LONG
# ═══════════════════════════════════════════════════════════════════
self.srv_command_long = self.create_service(
Trigger,
'/mavlink/send_command_long',
self.handle_command_long
)
# ═══════════════════════════════════════════════════════════════════
# Service 3: 參數請求
# ═══════════════════════════════════════════════════════════════════
self.srv_param_request = self.create_service(
Trigger,
'/mavlink/param_request',
self.handle_param_request
)
# 狀態標記
self.running = True
# mavlinkObject 的引用(將由外部設置)
self.mavlink_analyzer = None
logger.info("MavlinkCommandService initialized")
def set_mavlink_analyzer(self, mavlink_analyzer):
"""
設置 mavlink_analyzer 引用
Args:
mavlink_analyzer: mavlinkObject.mavlink_analyzer 實例
"""
self.mavlink_analyzer = mavlink_analyzer
logger.info("MavlinkCommandService: mavlink_analyzer set")
# ═══════════════════════════════════════════════════════════════════════
# Service Handler: 發送 MAVLink Message
# ═══════════════════════════════════════════════════════════════════════
def handle_send_message(self, request, response):
"""
處理發送 MAVLink 訊息的請求
ROS2 Service Callback 說明
- 此函數會在 client 調用 service 時被執行
- request: 包含 client 傳入的參數
- response: 需要填充結果並返回給 client
- 必須 return response
Args:
request: Trigger.Request (暫時使用未來改為自定義)
response: Trigger.Response
Returns:
response: 填充後的回應
"""
logger.info("Received send_message request")
# 檢查 mavlink_analyzer 是否已設置
if self.mavlink_analyzer is None:
response.success = False
response.message = "Error: mavlink_analyzer not set"
logger.error(response.message)
return response
# TODO: 實際實現
# 1. 從 request 解析參數target_sysid, message_id, fields 等)
# 2. 使用 pymavlink 組裝 MAVLink 封包
# 3. 調用 mavlink_analyzer.send_message() 發送
# 4. 如果 wait_response=True則等待 return_packet_ring 中的回應
# 暫時返回成功(示範用)
response.success = True
response.message = "Message sent (placeholder implementation)"
return response
# ═══════════════════════════════════════════════════════════════════════
# Service Handler: 發送 COMMAND_LONG
# ═══════════════════════════════════════════════════════════════════════
def handle_command_long(self, request, response):
"""
處理發送 COMMAND_LONG 的請求
COMMAND_LONG (MAVLink message ID=76):
- 用於發送簡單命令給載具
- 常用於 ARM/DISARM, 模式切換, TAKEOFF, LAND
Args:
request: Trigger.Request
response: Trigger.Response
Returns:
response: 填充後的回應
"""
logger.info("Received command_long request")
if self.mavlink_analyzer is None:
response.success = False
response.message = "Error: mavlink_analyzer not set"
return response
# TODO: 實際實現
# 1. 從 request 解析 COMMAND_LONG 參數
# - target_sysid, target_compid
# - command (MAV_CMD_xxx)
# - param1~param7
# 2. 組裝 COMMAND_LONG 封包
# 3. 發送並等待 COMMAND_ACK (message ID=77)
# 4. 解析 ACK 結果ACCEPTED/FAILED 等)
response.success = True
response.message = "Command sent (placeholder implementation)"
return response
# ═══════════════════════════════════════════════════════════════════════
# Service Handler: 參數請求
# ═══════════════════════════════════════════════════════════════════════
def handle_param_request(self, request, response):
"""
處理參數讀取請求
MAVLink 參數協議
- PARAM_REQUEST_READ (ID=20): 請求讀取參數
- PARAM_VALUE (ID=22): 參數值回應
- PARAM_SET (ID=23): 設置參數值
使用 mavlinkObject 回應機制的步驟
1. 設置回應訊息類型
self.mavlink_analyzer.set_return_message_types([22]) # PARAM_VALUE
2. 發送請求封包
message_bytes = ... # 組裝 PARAM_REQUEST_READ
self.mavlink_analyzer.send_message(
message_bytes,
target_sysid=1
)
3. 監聽回應在獨立線程或定時器中
from ..fc_network_adapter import mavlinkObject as mo
# 等待回應(帶超時)
timeout = 3.0
start_time = time.time()
while time.time() - start_time < timeout:
items = mo.return_packet_ring.get_all()
for socket_id, timestamp, msg in items:
if msg.get_type() == 'PARAM_VALUE':
# 找到回應!
param_id = msg.param_id
param_value = msg.param_value
# 處理回應...
return
time.sleep(0.01) # 短暫等待
# 超時處理
4. 清理可選
self.mavlink_analyzer.set_return_message_types([]) # 清空
mo.return_packet_ring.clear() # 清空緩衝區
注意事項
- return_packet_ring 是全域的所有 mavlink_object 共用
- 需要通過 socket_id sysid 來識別回應來源
- 實際使用時建議實現專門的回應管理器
Args:
request: Trigger.Request
response: Trigger.Response
Returns:
response: 填充後的回應
"""
logger.info("Received param_request")
if self.mavlink_analyzer is None:
response.success = False
response.message = "Error: mavlink_analyzer not set"
return response
# TODO: 實際實現
# 1. 從 request 解析參數名稱或索引
# 2. 設置 mavlink_analyzer.set_return_message_types([22]) # PARAM_VALUE
# 3. 發送 PARAM_REQUEST_READ
# 4. 監聽 return_packet_ring等待 PARAM_VALUE
# 5. 解析回應並填充到 response
response.success = True
response.message = "Param request sent (placeholder implementation)"
return response
# ═══════════════════════════════════════════════════════════════════════
# 【新增 Service 位置 4/4】
# 若要新增 service請在此處添加新的 handler 方法
# 並在 __init__ 中創建對應的 service server
# ═══════════════════════════════════════════════════════════════════════
def stop(self):
"""停止服務"""
# logger.info("MavlinkCommandService stopped")
self.running = False
logger.info("MavlinkCommandService stopped")
# ============================================================================

@ -323,7 +323,7 @@ class VehicleView:
"""
if component_id not in self.components:
self.components[component_id] = VehicleComponent(component_id, comp_type)
# logger.info(f"Added component {component_id} to system {self.sysid}")
# logger.debug(f"Added component {component_id} to system {self.sysid}")
return self.components[component_id]
def get_component(self, component_id: int) -> Optional[VehicleComponent]:
@ -334,7 +334,7 @@ class VehicleView:
"""移除組件"""
if component_id in self.components:
del self.components[component_id]
# logger.info(f"Removed component {component_id} from system {self.sysid}")
# logger.debug(f"Removed component {component_id} from system {self.sysid}")
return True
return False

@ -0,0 +1,35 @@
備選需要的功能
- serail 對於 telemetry 的支援
- serial_manager.serial_object.transport 這些變數可能不需要
不用動
- mavlink_object 的 send_message 確認一下 mavlink_bridge 的 _send_to_socket 是不是應該做成 async
- 不同 socket 上面有重複的 sysid 分開儲存 (不做 不允許sysid重複)
這一步
研究 ros2 service
下一步
下下一步
後面
rssi 資訊提取s
自己的常用指令
python -m fc_network_adapter.tests.test_vehicleStatusPublisher
python -m fc_network_adapter.tests.test_ringBuffer
ros2 topic list
ros2 topic echo
/home/picars/ardupilot/build/sitl/bin/arducopter -S --model + --speedup 1 --slave 0 --defaults /home/picars/ardupilot/Tools/autotest/default_params/copter.parm --sim-address=127.0.0.1 -I3 --sysid 7
mavproxy.py --master=tcp:127.0.0.1:5790 --out=udp:127.0.0.1:14560

@ -0,0 +1,27 @@
#!/bin/bash
# 網站清單
DOMAINS=("google.com" "smarter.nchu.edu.tw")
echo "網站 SSL 憑證剩餘天數:"
echo "---------------------------"
for domain in "${DOMAINS[@]}"; do
end_date=$(echo | openssl s_client -servername "$domain" -connect "$domain:443" 2>/dev/null |
openssl x509 -noout -enddate | cut -d= -f2)
end_timestamp=$(date -d "$end_date" +%s)
now_timestamp=$(date +%s)
remaining_days=$(( (end_timestamp - now_timestamp) / 86400 ))
if [ $remaining_days -lt 0 ]; then
status="已過期 ❌"
elif [ $remaining_days -lt 15 ]; then
status="即將到期 ⚠️"
else
status="正常 ✅"
fi
printf "%-20s 到期日:%-25s 剩餘天數:%3d 天 %s\n" "$domain" "$end_date" "$remaining_days" "$status"
done
Loading…
Cancel
Save