完善Xbee新功能整合到 mainOrchestrator

- Added support for a new XBee communication type (XBee(API-API) espv1) in mainOrchestrator.py.
- Changed logging level for publisher creation in mavlinkROS2Nodes.py from info to debug for improved verbosity.
- 移除 mainOrchestrator 啟動時的版本檢驗 改為用 logger 紀錄
- Py3.8 語法修正
chiyu
Chiyu Chen 22 hours ago
parent 4c3fe226d6
commit d734a7bfee

@ -435,6 +435,10 @@ class ControlPanel:
state.serial_info_temp["CommunicationType"] = "XBee(API-AT)" state.serial_info_temp["CommunicationType"] = "XBee(API-AT)"
menu_stack.pop() menu_stack.pop()
idx_stack.pop() idx_stack.pop()
elif selected.action == "SET_SERIAL_COMM_XBEE_ESP":
state.serial_info_temp["CommunicationType"] = "XBee(API-API) espv1"
menu_stack.pop()
idx_stack.pop()
elif selected.action == "SET_SERIAL_COMM_TELEMETRY": elif selected.action == "SET_SERIAL_COMM_TELEMETRY":
state.serial_info_temp["CommunicationType"] = "TELEMETRY" state.serial_info_temp["CommunicationType"] = "TELEMETRY"
menu_stack.pop() menu_stack.pop()
@ -812,6 +816,7 @@ class ControlPanel:
port_menu = MenuNode(f"{port}", children=[ port_menu = MenuNode(f"{port}", children=[
MenuNode("Set Comm Type", "設定通訊形態", "SET_SERIAL_COMM", children=[ MenuNode("Set Comm Type", "設定通訊形態", "SET_SERIAL_COMM", children=[
MenuNode("XBee(API-AT)", "XBee 模式", "SET_SERIAL_COMM_XBEE"), MenuNode("XBee(API-AT)", "XBee 模式", "SET_SERIAL_COMM_XBEE"),
MenuNode("XBee(API-API)", "XBee 模式(with ESP)", "SET_SERIAL_COMM_XBEE_ESP"),
MenuNode("Telemetry", "數傳模式", "SET_SERIAL_COMM_TELEMETRY"), MenuNode("Telemetry", "數傳模式", "SET_SERIAL_COMM_TELEMETRY"),
]), ]),
MenuNode("Set Baud", "設定 Baud", "TEXT_BAUD_SERIAL"), MenuNode("Set Baud", "設定 Baud", "TEXT_BAUD_SERIAL"),
@ -1595,6 +1600,7 @@ class Orchestrator:
# 定義通訊類型映射表 # 定義通訊類型映射表
COMM_TYPE_MAP = { COMM_TYPE_MAP = {
"XBee(API-AT)": sm.SerialMode.XBEEAPI2AT, "XBee(API-AT)": sm.SerialMode.XBEEAPI2AT,
"XBee(API-AT)": sm.SerialMode.XBEEAPI_espv1,
"TELEMETRY": sm.SerialMode.STRAIGHT, "TELEMETRY": sm.SerialMode.STRAIGHT,
# 新增區 # 新增區
} }
@ -1638,25 +1644,7 @@ class Orchestrator:
def main(): def main():
# =========== 各項模組的版本先驗 =========== logger.info(f"Each Module Running Version at mavlinkObkect:{mo.MODULE_VER}, mavlinkROS2Nodes:{mros.MODULE_VER}, mavlinkVehicleView:{mvv.MODULE_VER}, serialManager:{sm.MODULE_VER}")
# 除非你有在做這幾項模組的改版 不然動到這邊的版本號 代表執行環境有很大的問題!!!!!!
version_check = True
if mo.MODULE_VER != "1.50":
print("Module Version Error! : mavlinkObkect")
version_check = False
if mros.MODULE_VER != "2.50":
print("Module Version Error! : mavlinkROS2Nodes")
version_check = False
if mvv.MODULE_VER != "1.10":
print("Module Version Error! : mavlinkVehicleView")
version_check = False
if sm.MODULE_VER != "2.00":
print("Module Version Error! : serialManager")
version_check = False
if version_check == False:
print("Environment Obstacle! Check YOUR Execution System Path First!!")
return
# ========================================
stop_evt = threading.Event() stop_evt = threading.Event()
def signal_handler(signum, frame): def signal_handler(signum, frame):

@ -214,7 +214,7 @@ class VehicleStatusPublisher(Node):
topic_name = f'{self.topicString_prefix}/sys{sysid}/{topic}' topic_name = f'{self.topicString_prefix}/sys{sysid}/{topic}'
publisher = self.create_publisher(msg_type, topic_name, qos) publisher = self.create_publisher(msg_type, topic_name, qos)
self.fc_publishers[key] = publisher self.fc_publishers[key] = publisher
logger.info(f"Created publisher: {topic_name}") logger.debug(f"Created publisher: {topic_name}")
return self.fc_publishers[key] return self.fc_publishers[key]
def _publish_position_gnss(self, sysid: int, status: mvv.ComponentStatus): def _publish_position_gnss(self, sysid: int, status: mvv.ComponentStatus):

@ -19,6 +19,7 @@ from enum import Enum, auto
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from dataclasses import dataclass from dataclasses import dataclass
from typing import Callable, Optional from typing import Callable, Optional
from typing import List
# # XBee 模組 # # XBee 模組
# from xbee.frame import APIFrame # from xbee.frame import APIFrame
@ -449,11 +450,11 @@ class XBeeFrameProcessor_ESPv1(XBeeFrameProcessor_Base):
return len(self.gcs_transmit_queue) return len(self.gcs_transmit_queue)
# 把 gcs_transmit_queue 的 mavlink 封包依照大小打包出來 # 把 gcs_transmit_queue 的 mavlink 封包依照大小打包出來
def _pop_flush_batch(self, max_bytes: int) -> list[bytes]: def _pop_flush_batch(self, max_bytes: int) -> List[bytes]:
if not self.gcs_transmit_queue: if not self.gcs_transmit_queue:
return [] return []
batch: list[bytes] = [] batch: List[bytes] = []
total_bytes = 0 total_bytes = 0
while self.gcs_transmit_queue: while self.gcs_transmit_queue:
@ -507,7 +508,7 @@ class XBeeFrameProcessor_ESPv1(XBeeFrameProcessor_Base):
# ---- POLL 排程輔助 ---- # ---- POLL 排程輔助 ----
# 計算下次要 poll 的對象跟大小 # 計算下次要 poll 的對象跟大小
def _pick_poll_target(self) -> tuple[Optional[bytes], int]: def _pick_poll_target(self):
poll_devices = [ poll_devices = [
pollStrategy.PollDevice( pollStrategy.PollDevice(
address_64=address_64, address_64=address_64,

@ -5,6 +5,7 @@ POLL 輪詢策略,供 XBeeFrameProcessor_ESPv1 使用。
""" """
from dataclasses import dataclass from dataclasses import dataclass
from typing import List
@dataclass(frozen=True) @dataclass(frozen=True)
@ -23,10 +24,9 @@ class PollSchedulerState:
def pick_next( def pick_next(
devices: list[PollDevice], devices: List[PollDevice],
scheduler_state: PollSchedulerState, scheduler_state: PollSchedulerState
# default_grant_bytes: int = 600, ):
) -> tuple[bytes | None, int]:
""" """
選下一個 POLL 目標 選下一個 POLL 目標

Loading…
Cancel
Save