add vehicleDiagShow.py for vehicle diagnostics
- Introduced vehicleDiagShow.py, 這個簡易的終端機介面 顯示載具的自檢驗與狀態master
parent
bc841156cb
commit
ab616ceb54
@ -1,193 +0,0 @@
|
||||
import asyncio
|
||||
import serial_asyncio
|
||||
import struct
|
||||
import os
|
||||
import sys
|
||||
import serial
|
||||
import signal
|
||||
import traceback
|
||||
from pymavlink import mavutil
|
||||
|
||||
# === 設定區 ===
|
||||
SERIAL_PORT = 'COM15' # 手動指定
|
||||
SERIAL_BAUDRATE = 57600
|
||||
UDP_REMOTE_IP = '127.0.0.1'
|
||||
UDP_REMOTE_PORT = 14550
|
||||
DEBUG_MODE = False
|
||||
TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播
|
||||
|
||||
# === 工具函數 ===
|
||||
def check_serial_port():
|
||||
try:
|
||||
ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE)
|
||||
ser.close()
|
||||
return True
|
||||
except serial.SerialException as e:
|
||||
print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}")
|
||||
return False
|
||||
|
||||
def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes:
|
||||
frame_type = 0x10
|
||||
dest_addr16 = b'\xFF\xFE'
|
||||
broadcast_radius = 0x00
|
||||
options = 0x00
|
||||
|
||||
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
|
||||
frame += dest_addr64 + dest_addr16
|
||||
frame += struct.pack(">BB", broadcast_radius, options) + data
|
||||
checksum = 0xFF - (sum(frame) & 0xFF)
|
||||
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
|
||||
|
||||
# === Serial Protocol 實作 ===
|
||||
class SerialToUDP(asyncio.Protocol):
|
||||
def __init__(self, udp_protocol):
|
||||
self.udp_protocol = udp_protocol
|
||||
self.buffer = bytearray()
|
||||
|
||||
def connection_made(self, transport):
|
||||
self.transport = transport
|
||||
if hasattr(self.udp_protocol, 'set_serial_transport'):
|
||||
self.udp_protocol.set_serial_transport(self)
|
||||
print(f"Serial connection established on {SERIAL_PORT}")
|
||||
|
||||
def data_received(self, data):
|
||||
self.buffer.extend(data)
|
||||
while True:
|
||||
if len(self.buffer) < 3:
|
||||
return
|
||||
if self.buffer[0] != 0x7E:
|
||||
self.buffer.pop(0)
|
||||
continue
|
||||
length = (self.buffer[1] << 8) | self.buffer[2]
|
||||
full_length = 3 + length + 1
|
||||
if len(self.buffer) < full_length:
|
||||
return
|
||||
frame = self.buffer[:full_length]
|
||||
del self.buffer[:full_length]
|
||||
if hasattr(self.udp_protocol, 'send_udp'):
|
||||
self.udp_protocol.send_udp(bytes(frame))
|
||||
|
||||
def write_to_serial(self, data):
|
||||
try:
|
||||
api_frame = build_api_tx_frame(data, TARGET_ADDR64)
|
||||
pass
|
||||
self.transport.write(api_frame)
|
||||
except Exception as e:
|
||||
print(f"[TX Error] 無法封裝或傳送資料: {e}")
|
||||
|
||||
# === UDP Protocol 實作 ===
|
||||
class UDPHandler(asyncio.DatagramProtocol):
|
||||
def __init__(self):
|
||||
self.serial_transport = None
|
||||
self.transport = None
|
||||
self.mav_decoder = mavutil.mavlink.MAVLink(None)
|
||||
|
||||
def connection_made(self, transport):
|
||||
self.transport = transport
|
||||
print("UDP transport ready.")
|
||||
|
||||
def set_serial_transport(self, serial_transport):
|
||||
self.serial_transport = serial_transport
|
||||
|
||||
def datagram_received(self, data, addr):
|
||||
if self.serial_transport:
|
||||
self.serial_transport.write_to_serial(data)
|
||||
|
||||
def send_udp(self, data):
|
||||
decoded_data = self.decapsulate_data(data)
|
||||
if decoded_data is None:
|
||||
pass
|
||||
return
|
||||
self.decode_mavlink_data(decoded_data)
|
||||
if self.transport:
|
||||
self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT))
|
||||
|
||||
def decapsulate_data(self, data):
|
||||
try:
|
||||
if not data or data[0] != 0x7E:
|
||||
return None
|
||||
length = (data[1] << 8) | data[2]
|
||||
if len(data) < length + 4:
|
||||
return None
|
||||
frame_type = data[3]
|
||||
if frame_type == 0x90:
|
||||
rf_data_start = 3 + 12
|
||||
return data[rf_data_start:3 + length]
|
||||
else:
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"[XBee 解封錯誤] {e}")
|
||||
return None
|
||||
|
||||
def decode_mavlink_data(self, data):
|
||||
try:
|
||||
msg = self.mav_decoder.parse_char(data)
|
||||
if msg:
|
||||
if msg.get_type() == "HEARTBEAT":
|
||||
pass # 不輸出任何訊息
|
||||
else:
|
||||
pass
|
||||
except Exception as e:
|
||||
print(f"[MAVLink Decode Error] {e}")
|
||||
|
||||
# === 主流程 ===
|
||||
async def main():
|
||||
if not check_serial_port():
|
||||
print("程式終止:串口檢查失敗")
|
||||
return
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
if os.name != 'nt': # Windows 不支援 add_signal_handler
|
||||
for sig in (signal.SIGINT, signal.SIGTERM):
|
||||
loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop)))
|
||||
|
||||
udp_handler = UDPHandler()
|
||||
|
||||
try:
|
||||
udp_transport, _ = await loop.create_datagram_endpoint(
|
||||
lambda: udp_handler,
|
||||
local_addr=('0.0.0.0', 0)
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"無法創建 UDP 端點:{str(e)}")
|
||||
return
|
||||
|
||||
sock = udp_transport.get_extra_info('socket')
|
||||
print(f"UDP listening on {sock.getsockname()}")
|
||||
|
||||
try:
|
||||
serial_proto = SerialToUDP(udp_handler)
|
||||
await serial_asyncio.create_serial_connection(
|
||||
loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"無法建立串口連接:{str(e)}")
|
||||
traceback.print_exc()
|
||||
udp_transport.close()
|
||||
return
|
||||
|
||||
print("等待串口資料...")
|
||||
try:
|
||||
await asyncio.Future()
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
async def shutdown(loop):
|
||||
print("Shutting down...")
|
||||
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
loop.stop()
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
asyncio.run(main())
|
||||
except KeyboardInterrupt:
|
||||
print("程式被使用者中斷")
|
||||
except Exception as e:
|
||||
print("程式執行錯誤:")
|
||||
traceback.print_exc()
|
||||
@ -0,0 +1,544 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Standalone curses TUI for fc_network vehicle diagnostics (sys_diags, status_text, summary)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import curses
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from std_msgs.msg import String
|
||||
|
||||
from fc_interfaces.msg import SystemDiagnosticsRaw
|
||||
|
||||
VEHICLE_TOPIC_RE = re.compile(
|
||||
r'^/fc_network/vehicle/(sys\d+)/(sys_diags|status_text|summary)$'
|
||||
)
|
||||
STATUS_TEXT_RE = re.compile(r'^\[([\d.]+)\]\s*\[(-?\d+)\]\s*(.*)$')
|
||||
|
||||
STATUS_TEXT_TTL_SEC = 30.0
|
||||
STALE_SEC = 3.0
|
||||
RESCAN_SEC = 2.0
|
||||
OFFLINE_SEC = 10.0
|
||||
DRAW_HZ = 5.0
|
||||
|
||||
# MAV_SYS_STATUS_SENSOR bit labels (MAVLink common)
|
||||
SENSOR_BITS: Tuple[Tuple[int, str], ...] = (
|
||||
(268435456, 'PreArm'),
|
||||
(1, 'Gyro'),
|
||||
(2, 'Accel'),
|
||||
(4, 'Mag'),
|
||||
(8, 'Baro'),
|
||||
(16, 'DiffPress'),
|
||||
(32, 'GPS'),
|
||||
(64, 'OptFlow'),
|
||||
(128, 'Vision'),
|
||||
(256, 'Laser'),
|
||||
(512, 'ExtGT'),
|
||||
(1024, 'AngRate'),
|
||||
(2048, 'AttStab'),
|
||||
(4096, 'YawPos'),
|
||||
(8192, 'AltCtrl'),
|
||||
(16384, 'XYCtrl'),
|
||||
(32768, 'Motor'),
|
||||
(65536, 'RC'),
|
||||
(131072, 'Gyro2'),
|
||||
(262144, 'Accel2'),
|
||||
(524288, 'Mag2'),
|
||||
(1048576, 'Geofence'),
|
||||
(2097152, 'AHRS'),
|
||||
(4194304, 'Terrain'),
|
||||
(8388608, 'RevMotor'),
|
||||
(16777216, 'Log'),
|
||||
(33554432, 'Batt'),
|
||||
(67108864, 'Prox'),
|
||||
(134217728, 'Satcom'),
|
||||
(536870912, 'ObsAvoid'),
|
||||
(1073741824, 'Propulsion'),
|
||||
)
|
||||
|
||||
MAV_STATE_LABELS = {
|
||||
0: 'UNINIT',
|
||||
1: 'BOOT',
|
||||
2: 'CAL',
|
||||
3: 'STANDBY',
|
||||
4: 'ACTIVE',
|
||||
5: 'CRITICAL',
|
||||
6: 'EMERGENCY',
|
||||
7: 'OFF',
|
||||
8: 'TERM',
|
||||
}
|
||||
|
||||
SEVERITY_LABELS = {
|
||||
0: 'EMERG',
|
||||
1: 'ALERT',
|
||||
2: 'CRIT',
|
||||
3: 'ERROR',
|
||||
4: 'WARN',
|
||||
5: 'NOTICE',
|
||||
6: 'INFO',
|
||||
7: 'DEBUG',
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class StatusTextEntry:
|
||||
received_at: float
|
||||
severity: int
|
||||
text: str
|
||||
raw: str
|
||||
sys_key: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class VehicleDiagState:
|
||||
sys_key: str
|
||||
sysid: int
|
||||
summary: Optional[dict] = None
|
||||
diag: Optional[SystemDiagnosticsRaw] = None
|
||||
status_entries: List[StatusTextEntry] = field(default_factory=list)
|
||||
last_summary_ts: float = 0.0
|
||||
last_diag_ts: float = 0.0
|
||||
last_topic_seen_ts: float = 0.0
|
||||
offline: bool = False
|
||||
|
||||
|
||||
def sys_key_to_id(sys_key: str) -> int:
|
||||
return int(sys_key.replace('sys', '', 1))
|
||||
|
||||
|
||||
def purge_status_entries(entries: List[StatusTextEntry], now: float, ttl: float = STATUS_TEXT_TTL_SEC) -> None:
|
||||
cutoff = now - ttl
|
||||
entries[:] = [e for e in entries if e.received_at >= cutoff]
|
||||
|
||||
|
||||
def parse_status_text(raw: str, sys_key: str, received_at: float) -> StatusTextEntry:
|
||||
match = STATUS_TEXT_RE.match(raw.strip())
|
||||
if match:
|
||||
severity = int(match.group(2))
|
||||
text = match.group(3)
|
||||
else:
|
||||
severity = -1
|
||||
text = raw
|
||||
return StatusTextEntry(
|
||||
received_at=received_at,
|
||||
severity=severity,
|
||||
text=text,
|
||||
raw=raw,
|
||||
sys_key=sys_key,
|
||||
)
|
||||
|
||||
|
||||
def decode_sensors(install: int, enabled: int, health: int) -> List[Tuple[str, str]]:
|
||||
"""Return list of (label, status) where status is OK / FAIL / OFF."""
|
||||
items: List[Tuple[str, str]] = []
|
||||
for bit, label in SENSOR_BITS:
|
||||
if not (install & bit):
|
||||
continue
|
||||
if not (enabled & bit):
|
||||
items.append((label, 'OFF'))
|
||||
elif health & bit:
|
||||
items.append((label, 'OK'))
|
||||
else:
|
||||
items.append((label, 'FAIL'))
|
||||
return items
|
||||
|
||||
|
||||
def format_age(seconds: float) -> str:
|
||||
if seconds < 0:
|
||||
return 'never'
|
||||
if seconds < 1.0:
|
||||
return f'{seconds * 1000:.0f}ms'
|
||||
if seconds < 60.0:
|
||||
return f'{seconds:.1f}s'
|
||||
return f'{seconds / 60:.1f}m'
|
||||
|
||||
|
||||
def truncate(text: str, width: int) -> str:
|
||||
if width <= 0:
|
||||
return ''
|
||||
if len(text) <= width:
|
||||
return text
|
||||
if width <= 3:
|
||||
return text[:width]
|
||||
return text[: width - 3] + '...'
|
||||
|
||||
|
||||
class VehicleDiagNode(Node):
|
||||
"""ROS2 node: discover vehicles and subscribe to diagnostic topics."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__('vehicle_diag_show')
|
||||
self._lock = threading.Lock()
|
||||
self._vehicles: Dict[str, VehicleDiagState] = {}
|
||||
self._subs: Dict[str, Dict[str, object]] = {}
|
||||
self._scan_timer = self.create_timer(RESCAN_SEC, self._rescan_topics)
|
||||
self._rescan_topics()
|
||||
|
||||
def snapshot(self) -> Dict[str, VehicleDiagState]:
|
||||
now = time.monotonic()
|
||||
with self._lock:
|
||||
for state in self._vehicles.values():
|
||||
purge_status_entries(state.status_entries, now)
|
||||
if state.last_topic_seen_ts > 0:
|
||||
state.offline = (now - state.last_topic_seen_ts) > OFFLINE_SEC
|
||||
return {k: state for k, state in self._vehicles.items()}
|
||||
|
||||
def _get_or_create_state(self, sys_key: str) -> VehicleDiagState:
|
||||
if sys_key not in self._vehicles:
|
||||
self._vehicles[sys_key] = VehicleDiagState(
|
||||
sys_key=sys_key,
|
||||
sysid=sys_key_to_id(sys_key),
|
||||
)
|
||||
return self._vehicles[sys_key]
|
||||
|
||||
def _rescan_topics(self) -> None:
|
||||
now = time.monotonic()
|
||||
found: set = set()
|
||||
for topic_name, _ in self.get_topic_names_and_types():
|
||||
match = VEHICLE_TOPIC_RE.match(topic_name)
|
||||
if not match:
|
||||
continue
|
||||
sys_key = match.group(1)
|
||||
found.add(sys_key)
|
||||
with self._lock:
|
||||
state = self._get_or_create_state(sys_key)
|
||||
state.last_topic_seen_ts = now
|
||||
state.offline = False
|
||||
self._ensure_subscriptions(sys_key)
|
||||
|
||||
with self._lock:
|
||||
for sys_key, state in self._vehicles.items():
|
||||
if sys_key not in found and state.last_topic_seen_ts > 0:
|
||||
state.offline = (now - state.last_topic_seen_ts) > OFFLINE_SEC
|
||||
|
||||
def _ensure_subscriptions(self, sys_key: str) -> None:
|
||||
if sys_key in self._subs:
|
||||
return
|
||||
base = f'/fc_network/vehicle/{sys_key}'
|
||||
subs = {
|
||||
'summary': self.create_subscription(
|
||||
String,
|
||||
f'{base}/summary',
|
||||
lambda msg, sk=sys_key: self._on_summary(sk, msg),
|
||||
10,
|
||||
),
|
||||
'sys_diags': self.create_subscription(
|
||||
SystemDiagnosticsRaw,
|
||||
f'{base}/sys_diags',
|
||||
lambda msg, sk=sys_key: self._on_sys_diags(sk, msg),
|
||||
10,
|
||||
),
|
||||
'status_text': self.create_subscription(
|
||||
String,
|
||||
f'{base}/status_text',
|
||||
lambda msg, sk=sys_key: self._on_status_text(sk, msg),
|
||||
10,
|
||||
),
|
||||
}
|
||||
with self._lock:
|
||||
self._subs[sys_key] = subs
|
||||
|
||||
def _on_summary(self, sys_key: str, msg: String) -> None:
|
||||
now = time.monotonic()
|
||||
try:
|
||||
summary = json.loads(msg.data)
|
||||
except json.JSONDecodeError:
|
||||
summary = {'raw': msg.data}
|
||||
with self._lock:
|
||||
state = self._get_or_create_state(sys_key)
|
||||
state.summary = summary
|
||||
state.last_summary_ts = now
|
||||
state.last_topic_seen_ts = now
|
||||
|
||||
def _on_sys_diags(self, sys_key: str, msg: SystemDiagnosticsRaw) -> None:
|
||||
now = time.monotonic()
|
||||
with self._lock:
|
||||
state = self._get_or_create_state(sys_key)
|
||||
state.diag = msg
|
||||
state.last_diag_ts = now
|
||||
state.last_topic_seen_ts = now
|
||||
|
||||
def _on_status_text(self, sys_key: str, msg: String) -> None:
|
||||
now = time.monotonic()
|
||||
entry = parse_status_text(msg.data, sys_key, now)
|
||||
with self._lock:
|
||||
state = self._get_or_create_state(sys_key)
|
||||
state.status_entries.append(entry)
|
||||
purge_status_entries(state.status_entries, now)
|
||||
state.last_topic_seen_ts = now
|
||||
|
||||
|
||||
class DiagCursesApp:
|
||||
"""Curses front-end for vehicle diagnostics."""
|
||||
|
||||
def __init__(self, node: VehicleDiagNode) -> None:
|
||||
self._node = node
|
||||
self._running = True
|
||||
|
||||
def run(self) -> None:
|
||||
curses.wrapper(self._main)
|
||||
|
||||
def _init_colors(self) -> None:
|
||||
curses.start_color()
|
||||
curses.use_default_colors()
|
||||
curses.init_pair(1, curses.COLOR_GREEN, -1) # OK / DISARMED / INFO
|
||||
curses.init_pair(2, curses.COLOR_RED, -1) # FAIL / ARMED / ERROR+
|
||||
curses.init_pair(3, curses.COLOR_YELLOW, -1) # WARN / OFF
|
||||
curses.init_pair(4, curses.COLOR_CYAN, -1) # header / INFO
|
||||
curses.init_pair(5, curses.COLOR_WHITE, -1) # dim / offline
|
||||
curses.init_pair(6, curses.COLOR_MAGENTA, -1) # NOTICE
|
||||
|
||||
def _main(self, stdscr) -> None:
|
||||
curses.curs_set(0)
|
||||
stdscr.nodelay(True)
|
||||
stdscr.keypad(True)
|
||||
self._init_colors()
|
||||
|
||||
while self._running and rclpy.ok():
|
||||
try:
|
||||
self._draw(stdscr)
|
||||
except curses.error:
|
||||
pass
|
||||
key = stdscr.getch()
|
||||
if key in (ord('q'), ord('Q'), 27):
|
||||
self._running = False
|
||||
time.sleep(1.0 / DRAW_HZ)
|
||||
|
||||
def _draw(self, stdscr) -> None:
|
||||
stdscr.erase()
|
||||
height, width = stdscr.getmaxyx()
|
||||
now = time.monotonic()
|
||||
vehicles = self._node.snapshot()
|
||||
sorted_keys = sorted(vehicles.keys(), key=sys_key_to_id)
|
||||
|
||||
header = f' Vehicle Diagnostics Monitor vehicles:{len(sorted_keys)} Q:quit '
|
||||
stdscr.attron(curses.color_pair(4) | curses.A_BOLD)
|
||||
stdscr.addstr(0, 0, truncate(header.ljust(width - 1), width - 1))
|
||||
stdscr.attroff(curses.color_pair(4) | curses.A_BOLD)
|
||||
|
||||
row = 1
|
||||
if not sorted_keys:
|
||||
msg = ' Waiting for vehicles... (need /fc_network/vehicle/sysN/* topics) '
|
||||
if row < height - 1:
|
||||
stdscr.addstr(row, max(0, (width - len(msg)) // 2), truncate(msg, width - 1))
|
||||
row += 2
|
||||
|
||||
vehicle_rows_end = max(row, height - 8)
|
||||
for sys_key in sorted_keys:
|
||||
if row >= vehicle_rows_end:
|
||||
break
|
||||
state = vehicles[sys_key]
|
||||
row = self._draw_vehicle_block(stdscr, row, width, state, now)
|
||||
if row < height - 1:
|
||||
try:
|
||||
stdscr.addstr(row, 0, truncate('-' * (width - 1), width - 1))
|
||||
except curses.error:
|
||||
pass
|
||||
row += 1
|
||||
|
||||
log_title = ' Status Text (last 30s) '
|
||||
if row < height - 1:
|
||||
stdscr.attron(curses.color_pair(4))
|
||||
stdscr.addstr(row, 0, truncate(log_title.ljust(width - 1), width - 1))
|
||||
stdscr.attroff(curses.color_pair(4))
|
||||
row += 1
|
||||
|
||||
all_entries: List[StatusTextEntry] = []
|
||||
for sys_key in sorted_keys:
|
||||
all_entries.extend(vehicles[sys_key].status_entries)
|
||||
all_entries.sort(key=lambda e: e.received_at)
|
||||
|
||||
log_rows = height - row - 1
|
||||
if log_rows <= 0:
|
||||
stdscr.refresh()
|
||||
return
|
||||
|
||||
if not all_entries:
|
||||
empty = ' (no messages in last 30s) '
|
||||
if row < height - 1:
|
||||
stdscr.addstr(row, 0, truncate(empty, width - 1))
|
||||
else:
|
||||
visible = all_entries[-log_rows:]
|
||||
for entry in visible:
|
||||
if row >= height - 1:
|
||||
break
|
||||
self._draw_status_line(stdscr, row, width, entry)
|
||||
row += 1
|
||||
|
||||
stdscr.refresh()
|
||||
|
||||
def _draw_vehicle_block(
|
||||
self,
|
||||
stdscr,
|
||||
row: int,
|
||||
width: int,
|
||||
state: VehicleDiagState,
|
||||
now: float,
|
||||
) -> int:
|
||||
summary = state.summary or {}
|
||||
mode = summary.get('mode_name', 'UNKNOWN')
|
||||
armed = summary.get('armed', False)
|
||||
conn = summary.get('connection_type', '?')
|
||||
socket_id = summary.get('socket_id', -1)
|
||||
mav_status = summary.get('mav_status', 0)
|
||||
mav_label = MAV_STATE_LABELS.get(int(mav_status), str(mav_status))
|
||||
|
||||
last_update = max(state.last_summary_ts, state.last_diag_ts)
|
||||
age_sec = (now - last_update) if last_update > 0 else -1.0
|
||||
stale = age_sec >= 0 and age_sec > STALE_SEC
|
||||
|
||||
armed_str = 'ARMED' if armed else 'DISARMED'
|
||||
flags = []
|
||||
if state.offline:
|
||||
flags.append('OFFLINE')
|
||||
if stale and not state.offline:
|
||||
flags.append('STALE')
|
||||
flag_str = f" [{' '.join(flags)}]" if flags else ''
|
||||
|
||||
line1 = (
|
||||
f' {state.sys_key} (MAV {state.sysid}) {mode} {armed_str} '
|
||||
f'{conn} socket:{socket_id} {mav_label} '
|
||||
f'updated {format_age(age_sec)}{flag_str}'
|
||||
)
|
||||
if row < stdscr.getmaxyx()[0] - 1:
|
||||
if state.offline:
|
||||
stdscr.attron(curses.color_pair(5))
|
||||
stdscr.addstr(row, 0, truncate(line1, width - 1))
|
||||
stdscr.attroff(curses.color_pair(5))
|
||||
else:
|
||||
stdscr.addstr(row, 0, truncate(line1, width - 1))
|
||||
armed_pos = line1.find(armed_str)
|
||||
if armed_pos >= 0:
|
||||
pair = curses.color_pair(2) if armed else curses.color_pair(1)
|
||||
stdscr.attron(pair | curses.A_BOLD)
|
||||
stdscr.addstr(row, armed_pos, armed_str)
|
||||
stdscr.attroff(pair | curses.A_BOLD)
|
||||
row += 1
|
||||
|
||||
if state.diag is not None and row < stdscr.getmaxyx()[0] - 1:
|
||||
load_pct = state.diag.mcu_load / 10.0
|
||||
drop_pct = state.diag.bus_error_rate / 10.0
|
||||
line2 = f' MCU {load_pct:.1f}% CommDrop {drop_pct:.1f}%'
|
||||
stdscr.addstr(row, 0, truncate(line2, width - 1))
|
||||
row += 1
|
||||
|
||||
sensors = decode_sensors(
|
||||
state.diag.sensors_install_mask,
|
||||
state.diag.sensors_enabled_mask,
|
||||
state.diag.sensors_health_mask,
|
||||
)
|
||||
row = self._draw_sensor_line(stdscr, row, width, sensors)
|
||||
elif row < stdscr.getmaxyx()[0] - 1:
|
||||
stdscr.addstr(row, 0, truncate(' (no sys_diags yet)', width - 1))
|
||||
row += 1
|
||||
|
||||
return row
|
||||
|
||||
def _draw_sensor_line(
|
||||
self,
|
||||
stdscr,
|
||||
row: int,
|
||||
width: int,
|
||||
sensors: List[Tuple[str, str]],
|
||||
) -> int:
|
||||
max_row, _ = stdscr.getmaxyx()
|
||||
if not sensors:
|
||||
if row < max_row - 1:
|
||||
stdscr.addstr(row, 0, truncate(' Sensors: (none installed)', width - 1))
|
||||
return row + 1
|
||||
|
||||
lines: List[List[Tuple[str, str]]] = [[]]
|
||||
line_len = len(' Sensors: ')
|
||||
for label, status in sensors:
|
||||
token = f'{label}:{status} '
|
||||
if lines[-1] and line_len + len(token) >= width - 1:
|
||||
lines.append([])
|
||||
line_len = 2
|
||||
lines[-1].append((label, status))
|
||||
line_len += len(token)
|
||||
|
||||
for idx, line_items in enumerate(lines):
|
||||
if row >= max_row - 1:
|
||||
break
|
||||
prefix = ' Sensors: ' if idx == 0 else ' '
|
||||
col = 0
|
||||
stdscr.addstr(row, col, prefix)
|
||||
col += len(prefix)
|
||||
for label, status in line_items:
|
||||
token = f'{label}:{status} '
|
||||
if col + len(token) >= width - 1:
|
||||
stdscr.addstr(row, col, truncate('...', width - col - 1))
|
||||
return row + 1
|
||||
pair = curses.A_NORMAL
|
||||
if status == 'OK':
|
||||
pair = curses.color_pair(1)
|
||||
elif status == 'FAIL':
|
||||
pair = curses.color_pair(2) | curses.A_BOLD
|
||||
elif status == 'OFF':
|
||||
pair = curses.color_pair(3)
|
||||
stdscr.attron(pair)
|
||||
stdscr.addstr(row, col, token)
|
||||
stdscr.attroff(pair)
|
||||
col += len(token)
|
||||
row += 1
|
||||
return row
|
||||
|
||||
def _draw_status_line(
|
||||
self,
|
||||
stdscr,
|
||||
row: int,
|
||||
width: int,
|
||||
entry: StatusTextEntry,
|
||||
) -> None:
|
||||
sev_label = SEVERITY_LABELS.get(entry.severity, str(entry.severity))
|
||||
prefix = f'[{entry.sys_key:5}] [{entry.severity} {sev_label:5}] '
|
||||
body = truncate(entry.text, max(0, width - len(prefix) - 2))
|
||||
line = prefix + body
|
||||
|
||||
pair = curses.A_NORMAL
|
||||
if entry.severity <= 3:
|
||||
pair = curses.color_pair(2)
|
||||
elif entry.severity == 4:
|
||||
pair = curses.color_pair(3)
|
||||
elif entry.severity == 5:
|
||||
pair = curses.color_pair(6)
|
||||
elif entry.severity == 6:
|
||||
pair = curses.color_pair(1)
|
||||
elif entry.severity == 7:
|
||||
pair = curses.color_pair(5)
|
||||
|
||||
stdscr.attron(pair)
|
||||
stdscr.addstr(row, 0, truncate(line, width - 1))
|
||||
stdscr.attroff(pair)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
if not sys.stdout.isatty():
|
||||
print('vehicleDiagShow requires an interactive terminal (TTY).', file=sys.stderr)
|
||||
return 1
|
||||
|
||||
rclpy.init()
|
||||
node = VehicleDiagNode()
|
||||
spin_thread = threading.Thread(target=rclpy.spin, args=(node,), daemon=True)
|
||||
spin_thread.start()
|
||||
|
||||
try:
|
||||
DiagCursesApp(node).run()
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
spin_thread.join(timeout=2.0)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
||||
Loading…
Reference in New Issue