# (update)

1. serialManager.py - AT command 支援與架構重整
2. mainOrchestrator.py - serial 介面功能開啟
lunu
Chiyu Chen 2 weeks ago
parent 7b7e02a9e7
commit cc00427848

@ -29,7 +29,7 @@ from .utils import acquireSerial, acquirePort
from .utils.acquirePort import find_available_port
logger = setup_logger(os.path.basename(__file__))
PROJECT_VER = "v0.61"
PROJECT_VER = "v0.70"
class PanelState:
def __init__(self):
@ -45,7 +45,7 @@ class PanelState:
# 關於創建通道時的暫存資訊
self.udp_info_temp = {"IP": "127.0.0.1", "Port": "", "Direction": ""} # 暫存 UDP 設定資訊
self.serial_info_temp = {"Port": "", "Baud": 115200, "CommunicationType": "", "Go2Middleware": False} # 暫存 Serial 設定資訊
self.serial_info_temp = {"Port": "", "Baud": 115200, "CommunicationType": "", "Go2Middleware": True} # 暫存 Serial 設定資訊
# 關於顯示通道資訊
self.socket_info_single = {
@ -426,7 +426,7 @@ class ControlPanel:
menu_stack.pop()
idx_stack.pop()
elif selected.action == "SET_SERIAL_COMM_TELEMETRY":
state.serial_info_temp["CommunicationType"] = "XBee(AT-AT)"
state.serial_info_temp["CommunicationType"] = "TELEMETRY"
menu_stack.pop()
idx_stack.pop()
@ -767,7 +767,7 @@ class ControlPanel:
port_menu = MenuNode(f"{port}", children=[
MenuNode("Set Comm Type", "設定通訊形態", "SET_SERIAL_COMM", children=[
MenuNode("XBee(API-AT)", "XBee 模式", "SET_SERIAL_COMM_XBEE"),
# MenuNode("Telemetry", "數傳模式", "SET_SERIAL_COMM_TELEMETRY"),
MenuNode("Telemetry", "數傳模式", "SET_SERIAL_COMM_TELEMETRY"),
]),
MenuNode("Set Baud", "設定 Baud", "TEXT_BAUD_SERIAL"),
MenuNode("Link to MW", "直接建立 Middleware UDP", "LINK_SERIAL_TO_MIDDLEWARE_UDP", children=[
@ -1058,11 +1058,15 @@ class ControlPanel:
# MAVLink 訊息名稱對應(縮寫版本)
MSG_NAMES = {
0: "HB", 1: "SYS_ST", 24: "GPS_RAW", 27: "RAW_IMU",
0: "HB", 1: "S_STAT", 2: "S_TIME", 24: "GPS_RAW", 27: "RAW_IMU",
30: "ATT", 32: "LOC_POS", 33: "GLB_POS", 62: "NAV_CTL",
74: "VFR_HUD", 147: "BATT_ST"
74: "VFR_HUD", 147: "BATT_ST", 136: "TERRAIN", 241: "VIBRAT",
125: "POW_STA",
}
# ardupilot mega
# 178: "AHRS2", 163: "AHRS", 193: "EKF_STAT",
# 顯示前 12 個最常見的訊息類型(兩列各 6 個)
msg_items = list(msg_counts.items())[:12]
line = 13
@ -1134,6 +1138,7 @@ class Orchestrator:
self.bridge = mo.mavlink_bridge()
# === 3) serial_manager 部分的準備 ===
sm.rx_module_ack.clear()
self.plumber = sm.serial_manager()
# === 4) ros 部分的準備 ===
@ -1243,7 +1248,7 @@ class Orchestrator:
if serial_obj:
self.panelState.serial_info_single["serial_port"] = serial_obj.serial_port
self.panelState.serial_info_single["baudrate"] = serial_obj.baudrate
self.panelState.serial_info_single["receiver_type"] = serial_obj.receiver_type.name
self.panelState.serial_info_single["receiver_type"] = serial_obj.serial_mode.name
self.panelState.serial_info_single["target_port"] = serial_obj.target_port
self.panelState.serial_info_single["InfoReady"] = True # 標記資訊已準備好
elif action == "REMOVE_LINKED_SERIAL":
@ -1516,8 +1521,8 @@ class Orchestrator:
# 定義通訊類型映射表
COMM_TYPE_MAP = {
"XBee(API-AT)": sm.SerialReceiverType.XBEEAPI2AT,
# "XBee(AT-AT)": sm.SerialReceiverType.TELEMETRY, # TODO: 之後再弄
"XBee(API-AT)": sm.SerialMode.XBEEAPI2AT,
"TELEMETRY": sm.SerialMode.STRAIGHT,
# 新增區
}
@ -1541,7 +1546,7 @@ class Orchestrator:
serial_port=self.panelState.serial_info_temp['Port'],
baudrate=self.panelState.serial_info_temp['Baud'],
target_port=udp_port_tmp,
receiver_type=comm_type_tmp,
serial_mode=comm_type_tmp,
)
if not ret:
@ -1572,7 +1577,7 @@ def main():
if mvv.MODULE_VER != "1.00":
print("Module Version Error! : mavlinkVehicleView")
version_check = False
if sm.MODULE_VER != "0.60":
if sm.MODULE_VER != "0.80":
print("Module Version Error! : serialManager")
version_check = False
if version_check == False:

@ -230,6 +230,7 @@ class mavlink_bridge:
'vx': msg.vx, 'vy': msg.vy, 'vz': msg.vz
}
def _handle_global_position(self, vehicle, component, msg, timestamp):
"""處理 GLOBAL_POSITION_INT 訊息 (msg_id: 33)"""
component.status.position.latitude = msg.lat / 1e7 # 轉換為度

@ -210,7 +210,9 @@ class VehicleStatusPublisher(Node):
if pos.latitude is None or pos.longitude is None:
return
publisher = self._get_or_create_publisher(sysid, 'position', sensor_msgs.msg.NavSatFix)
publisher = self._get_or_create_publisher(
sysid, 'position', sensor_msgs.msg.NavSatFix
)
# 檢查是否有訂閱者
if publisher.get_subscription_count() == 0:
@ -243,7 +245,7 @@ class VehicleStatusPublisher(Node):
publisher = self._get_or_create_publisher(
sysid, 'position_ned', nav_msgs.msg.Odometry
)
)
if publisher.get_subscription_count() == 0:
return

@ -16,6 +16,7 @@ import threading
import struct
from enum import Enum, auto
from abc import ABC, abstractmethod
from dataclasses import dataclass
# # XBee 模組
# from xbee.frame import APIFrame
@ -28,7 +29,7 @@ from .utils import RingBuffer, setup_logger
logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "0.80"
rx_module_ack = RingBuffer(capacity=256, buffer_id=253)
rx_module_ack = RingBuffer(capacity=64, buffer_id=253)
# ====================== State DEFINITION =====================
@ -39,6 +40,29 @@ class SerialMode(Enum):
XBEEAPI2AT = auto() # XBee API 模式
NOT_USE = auto() # 不使用
# ====================== AT Frame Data Classes =====================
@dataclass
class ATRequest:
"""送往 dongle 的 AT 指令"""
command: bytes # 例如 b'DB'
parameter: bytes # 寫入用指令的參數,讀取型通常為空
frame_id: int # XBee frame id0x00 表示不要求回應
@dataclass
class ATResponse:
"""dongle 回傳的 AT Response (0x88)"""
frame_id: int
command: bytes
status: int
data: bytes
@property
def is_ok(self) -> bool:
return self.status == 0x00
# ====================== Frame Processor Base and Implementation =====================
class FrameProcessor(ABC):
@ -229,49 +253,51 @@ class ATCommandHandler:
"""由 SerialHandler 注入實際寫入 serial 的 callable"""
self.writer = writer
def send_command(self, command: bytes, parameter: bytes, frame_id: int):
"""
發送一筆 AT 指令給 dongle
- command: 2 bytes AT 指令名稱例如 b'DB'
- parameter: 指令參數 bytes讀取用指令通常為空
- frame_id: XBee frame id用來配對回應0x00 表示不要求回應
"""
def send_command(self, request: ATRequest):
"""發送一筆 AT 指令給 dongle"""
if self.writer is None:
logger.warning(
f"[{self.serial_port}] AT writer 尚未就緒,指令 {command!r} 丟棄"
f"[{self.serial_port}] AT writer 尚未就緒,指令 {request.command!r} 丟棄"
)
return
frame_bytes = self._build_at_request(command, parameter, frame_id)
self.writer(frame_bytes)
self.writer(self._build_at_request(request))
# logger.debug(
# f"[{self.serial_port}] send AT {command.decode(errors='replace')} "
# f"(frame_id=0x{frame_id:02X})"
# f"[{self.serial_port}] send AT {request.command.decode(errors='replace')} "
# f"(frame_id=0x{request.frame_id:02X})"
# ) # dev
@staticmethod
def _build_at_request(command: bytes, parameter: bytes, frame_id: int) -> bytes:
"""
AT 指令組成 XBee API AT Command Request frame (0x08)
callers 必須明確指定 frame_id
"""
def _build_at_request(request: ATRequest) -> bytes:
"""將 ATRequest 組成 XBee API AT Command Request frame (0x08) bytes"""
frame_type = ATCommandHandler.FRAME_TYPE_AT_COMMAND
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
frame += command + parameter
frame = struct.pack(">B", frame_type) + struct.pack(">B", request.frame_id)
frame += request.command + request.parameter
checksum = 0xFF - (sum(frame) & 0xFF)
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
# ---- 接收端 ----
def handle_frame(self, frame: bytes) -> None:
"""接收一整個 AT Response frame內部完成 parse + dispatch"""
"""
接收一整個 AT Response frame
1. 解析成 ATResponse
2. 推進 rx_module_ack 供其他模組消費
3. 本地 dispatch 給對應的 _handle_xxx
"""
parsed = self._parse(frame)
if parsed is None:
return
if not rx_module_ack.put(parsed):
logger.warning(
f"[{self.serial_port}] rx_module_ack overflow, drop {parsed.command!r}"
)
self._dispatch(parsed)
@staticmethod
def _parse(frame: bytes) -> dict:
def _parse(frame: bytes) -> ATResponse:
"""解析 AT Command Response (0x88);不符格式回傳 None"""
if len(frame) < 8:
return None
@ -279,44 +305,40 @@ class ATCommandHandler:
if frame[3] != 0x88:
return None
frame_id = frame[4]
at_command = frame[5:7]
status = frame[7]
data = frame[8:] if len(frame) > 8 else b''
return {
'frame_id': frame_id,
'command': at_command,
'status': status,
'data': data,
'is_ok': status == 0x00,
}
return ATResponse(
frame_id=frame[4],
command=frame[5:7],
status=frame[7],
data=frame[8:] if len(frame) > 8 else b'',
)
def _dispatch(self, response: dict) -> None:
def _dispatch(self, response: ATResponse) -> None:
"""根據 AT 指令類型分派處理"""
# print(f"[{self.serial_port}] AT Response: {response}") # dev
if not response['is_ok']:
if not response.is_ok:
logger.warning(
f"[{self.serial_port}] AT {response['command'].decode()} "
f"失敗,狀態碼: {response['status']}"
f"[{self.serial_port}] AT {response.command.decode()} "
f"失敗,狀態碼: {response.status}"
)
return
handler = self.handlers.get(response['command'])
handler = self.handlers.get(response.command)
if handler:
handler(response['data'])
handler(response.data)
else:
logger.debug(
f"[{self.serial_port}] 未處理的 AT 指令: "
f"{response['command'].decode()}"
f"{response.command.decode()}"
)
def _handle_rssi(self, data: bytes):
"""處理 DB (RSSI) 回應:單 byte 無號值,單位 dBm"""
if data:
print(f"[{self.serial_port}] RSSI = -{data[0]} dBm") # dev
pass
# if data:
# print(f"[{self.serial_port}] RSSI = -{data[0]} dBm") # dev
# logger.debug(f"[{self.serial_port}] RSSI = -{data[0]} dBm") # dev
def _handle_serial_high(self, data: bytes):
"""處理 SH (Serial Number High)"""
pass
@ -644,13 +666,11 @@ class serial_manager:
ret[key] = obj.serial_port
return ret
def send_at_command(self, serial_id, command: bytes, parameter: bytes = b'', frame_id: int = 0x52) -> bool:
def send_at_command(self, serial_id, request: ATRequest) -> bool:
"""
對指定 serial_id XBee dongle 發送一筆 AT 指令thread-safe
- serial_id: create_serial_link 取得的編號
- command: 例如 b'DB'
- parameter: 指令參數讀取型指令通常為空
- frame_id: XBee frame id0x00 代表不要求回應
- request: ATRequest 物件攜帶 command / parameter / frame_id
回傳是否成功排進事件圈
"""
if not self.running or not self.loop:
@ -670,9 +690,7 @@ class serial_manager:
return False
at_handler = serial_obj.serial_handler.processor.at_handler
self.loop.call_soon_threadsafe(
at_handler.send_command, command, parameter, frame_id
)
self.loop.call_soon_threadsafe(at_handler.send_command, request)
return True
@staticmethod
@ -727,8 +745,9 @@ if __name__ == '__main__':
# 等 connection_made 完成 writer 注入,再發一筆 AT 指令測試
time.sleep(5)
rssi_request = ATRequest(command=b'DB', parameter=b'', frame_id=0x52)
for i in range(60):
sm.send_at_command(1, b'DB', frame_id=0x52)
sm.send_at_command(1, rssi_request)
time.sleep(1)
sm.remove_serial_link(1)

Loading…
Cancel
Save