From ab616ceb5445ed07c0556e30c52a31175f2c22ac Mon Sep 17 00:00:00 2001 From: Chiyu Chen Date: Thu, 11 Jun 2026 09:09:42 +0800 Subject: [PATCH] add vehicleDiagShow.py for vehicle diagnostics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Introduced vehicleDiagShow.py, 這個簡易的終端機介面 顯示載具的自檢驗與狀態 --- README.md | 2 + serial_udp_update.py | 193 --------- src/fc_network_apps/vehicleDiagShow.py | 544 +++++++++++++++++++++++++ 3 files changed, 546 insertions(+), 193 deletions(-) delete mode 100644 serial_udp_update.py create mode 100644 src/fc_network_apps/vehicleDiagShow.py diff --git a/README.md b/README.md index 2684f10..37a050c 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,8 @@ cd ~/AirTrapMine/src/ # 這是範例!!! python -m fc_network_adapter.fc_network_adapter.mainOrchestrator python -m fc_network_adapter.tests.demo_integration python -m someotherpkg.src.example_wholeMoving + +python fc_network_apps/vehicleDiagShow.py ``` 2. diff --git a/serial_udp_update.py b/serial_udp_update.py deleted file mode 100644 index 728adca..0000000 --- a/serial_udp_update.py +++ /dev/null @@ -1,193 +0,0 @@ -import asyncio -import serial_asyncio -import struct -import os -import sys -import serial -import signal -import traceback -from pymavlink import mavutil - -# === 設定區 === -SERIAL_PORT = 'COM15' # 手動指定 -SERIAL_BAUDRATE = 57600 -UDP_REMOTE_IP = '127.0.0.1' -UDP_REMOTE_PORT = 14550 -DEBUG_MODE = False -TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播 - -# === 工具函數 === -def check_serial_port(): - try: - ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE) - ser.close() - return True - except serial.SerialException as e: - print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}") - return False - except Exception as e: - print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}") - return False - -def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes: - frame_type = 0x10 - dest_addr16 = b'\xFF\xFE' - broadcast_radius = 0x00 - options = 0x00 - - frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) - frame += dest_addr64 + dest_addr16 - frame += struct.pack(">BB", broadcast_radius, options) + data - checksum = 0xFF - (sum(frame) & 0xFF) - return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) - -# === Serial Protocol 實作 === -class SerialToUDP(asyncio.Protocol): - def __init__(self, udp_protocol): - self.udp_protocol = udp_protocol - self.buffer = bytearray() - - def connection_made(self, transport): - self.transport = transport - if hasattr(self.udp_protocol, 'set_serial_transport'): - self.udp_protocol.set_serial_transport(self) - print(f"Serial connection established on {SERIAL_PORT}") - - def data_received(self, data): - self.buffer.extend(data) - while True: - if len(self.buffer) < 3: - return - if self.buffer[0] != 0x7E: - self.buffer.pop(0) - continue - length = (self.buffer[1] << 8) | self.buffer[2] - full_length = 3 + length + 1 - if len(self.buffer) < full_length: - return - frame = self.buffer[:full_length] - del self.buffer[:full_length] - if hasattr(self.udp_protocol, 'send_udp'): - self.udp_protocol.send_udp(bytes(frame)) - - def write_to_serial(self, data): - try: - api_frame = build_api_tx_frame(data, TARGET_ADDR64) - pass - self.transport.write(api_frame) - except Exception as e: - print(f"[TX Error] 無法封裝或傳送資料: {e}") - -# === UDP Protocol 實作 === -class UDPHandler(asyncio.DatagramProtocol): - def __init__(self): - self.serial_transport = None - self.transport = None - self.mav_decoder = mavutil.mavlink.MAVLink(None) - - def connection_made(self, transport): - self.transport = transport - print("UDP transport ready.") - - def set_serial_transport(self, serial_transport): - self.serial_transport = serial_transport - - def datagram_received(self, data, addr): - if self.serial_transport: - self.serial_transport.write_to_serial(data) - - def send_udp(self, data): - decoded_data = self.decapsulate_data(data) - if decoded_data is None: - pass - return - self.decode_mavlink_data(decoded_data) - if self.transport: - self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT)) - - def decapsulate_data(self, data): - try: - if not data or data[0] != 0x7E: - return None - length = (data[1] << 8) | data[2] - if len(data) < length + 4: - return None - frame_type = data[3] - if frame_type == 0x90: - rf_data_start = 3 + 12 - return data[rf_data_start:3 + length] - else: - return None - except Exception as e: - print(f"[XBee 解封錯誤] {e}") - return None - - def decode_mavlink_data(self, data): - try: - msg = self.mav_decoder.parse_char(data) - if msg: - if msg.get_type() == "HEARTBEAT": - pass # 不輸出任何訊息 - else: - pass - except Exception as e: - print(f"[MAVLink Decode Error] {e}") - -# === 主流程 === -async def main(): - if not check_serial_port(): - print("程式終止:串口檢查失敗") - return - - loop = asyncio.get_running_loop() - if os.name != 'nt': # Windows 不支援 add_signal_handler - for sig in (signal.SIGINT, signal.SIGTERM): - loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop))) - - udp_handler = UDPHandler() - - try: - udp_transport, _ = await loop.create_datagram_endpoint( - lambda: udp_handler, - local_addr=('0.0.0.0', 0) - ) - except Exception as e: - print(f"無法創建 UDP 端點:{str(e)}") - return - - sock = udp_transport.get_extra_info('socket') - print(f"UDP listening on {sock.getsockname()}") - - try: - serial_proto = SerialToUDP(udp_handler) - await serial_asyncio.create_serial_connection( - loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE - ) - except Exception as e: - print(f"無法建立串口連接:{str(e)}") - traceback.print_exc() - udp_transport.close() - return - - print("等待串口資料...") - try: - await asyncio.Future() - except asyncio.CancelledError: - pass - -async def shutdown(loop): - print("Shutting down...") - tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] - for task in tasks: - task.cancel() - await asyncio.gather(*tasks, return_exceptions=True) - loop.stop() - -if __name__ == '__main__': - try: - asyncio.run(main()) - except KeyboardInterrupt: - print("程式被使用者中斷") - except Exception as e: - print("程式執行錯誤:") - traceback.print_exc() diff --git a/src/fc_network_apps/vehicleDiagShow.py b/src/fc_network_apps/vehicleDiagShow.py new file mode 100644 index 0000000..b692948 --- /dev/null +++ b/src/fc_network_apps/vehicleDiagShow.py @@ -0,0 +1,544 @@ +#!/usr/bin/env python3 +"""Standalone curses TUI for fc_network vehicle diagnostics (sys_diags, status_text, summary).""" + +from __future__ import annotations + +import curses +import json +import re +import sys +import threading +import time +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Tuple + +import rclpy +from rclpy.node import Node +from std_msgs.msg import String + +from fc_interfaces.msg import SystemDiagnosticsRaw + +VEHICLE_TOPIC_RE = re.compile( + r'^/fc_network/vehicle/(sys\d+)/(sys_diags|status_text|summary)$' +) +STATUS_TEXT_RE = re.compile(r'^\[([\d.]+)\]\s*\[(-?\d+)\]\s*(.*)$') + +STATUS_TEXT_TTL_SEC = 30.0 +STALE_SEC = 3.0 +RESCAN_SEC = 2.0 +OFFLINE_SEC = 10.0 +DRAW_HZ = 5.0 + +# MAV_SYS_STATUS_SENSOR bit labels (MAVLink common) +SENSOR_BITS: Tuple[Tuple[int, str], ...] = ( + (268435456, 'PreArm'), + (1, 'Gyro'), + (2, 'Accel'), + (4, 'Mag'), + (8, 'Baro'), + (16, 'DiffPress'), + (32, 'GPS'), + (64, 'OptFlow'), + (128, 'Vision'), + (256, 'Laser'), + (512, 'ExtGT'), + (1024, 'AngRate'), + (2048, 'AttStab'), + (4096, 'YawPos'), + (8192, 'AltCtrl'), + (16384, 'XYCtrl'), + (32768, 'Motor'), + (65536, 'RC'), + (131072, 'Gyro2'), + (262144, 'Accel2'), + (524288, 'Mag2'), + (1048576, 'Geofence'), + (2097152, 'AHRS'), + (4194304, 'Terrain'), + (8388608, 'RevMotor'), + (16777216, 'Log'), + (33554432, 'Batt'), + (67108864, 'Prox'), + (134217728, 'Satcom'), + (536870912, 'ObsAvoid'), + (1073741824, 'Propulsion'), +) + +MAV_STATE_LABELS = { + 0: 'UNINIT', + 1: 'BOOT', + 2: 'CAL', + 3: 'STANDBY', + 4: 'ACTIVE', + 5: 'CRITICAL', + 6: 'EMERGENCY', + 7: 'OFF', + 8: 'TERM', +} + +SEVERITY_LABELS = { + 0: 'EMERG', + 1: 'ALERT', + 2: 'CRIT', + 3: 'ERROR', + 4: 'WARN', + 5: 'NOTICE', + 6: 'INFO', + 7: 'DEBUG', +} + + +@dataclass +class StatusTextEntry: + received_at: float + severity: int + text: str + raw: str + sys_key: str + + +@dataclass +class VehicleDiagState: + sys_key: str + sysid: int + summary: Optional[dict] = None + diag: Optional[SystemDiagnosticsRaw] = None + status_entries: List[StatusTextEntry] = field(default_factory=list) + last_summary_ts: float = 0.0 + last_diag_ts: float = 0.0 + last_topic_seen_ts: float = 0.0 + offline: bool = False + + +def sys_key_to_id(sys_key: str) -> int: + return int(sys_key.replace('sys', '', 1)) + + +def purge_status_entries(entries: List[StatusTextEntry], now: float, ttl: float = STATUS_TEXT_TTL_SEC) -> None: + cutoff = now - ttl + entries[:] = [e for e in entries if e.received_at >= cutoff] + + +def parse_status_text(raw: str, sys_key: str, received_at: float) -> StatusTextEntry: + match = STATUS_TEXT_RE.match(raw.strip()) + if match: + severity = int(match.group(2)) + text = match.group(3) + else: + severity = -1 + text = raw + return StatusTextEntry( + received_at=received_at, + severity=severity, + text=text, + raw=raw, + sys_key=sys_key, + ) + + +def decode_sensors(install: int, enabled: int, health: int) -> List[Tuple[str, str]]: + """Return list of (label, status) where status is OK / FAIL / OFF.""" + items: List[Tuple[str, str]] = [] + for bit, label in SENSOR_BITS: + if not (install & bit): + continue + if not (enabled & bit): + items.append((label, 'OFF')) + elif health & bit: + items.append((label, 'OK')) + else: + items.append((label, 'FAIL')) + return items + + +def format_age(seconds: float) -> str: + if seconds < 0: + return 'never' + if seconds < 1.0: + return f'{seconds * 1000:.0f}ms' + if seconds < 60.0: + return f'{seconds:.1f}s' + return f'{seconds / 60:.1f}m' + + +def truncate(text: str, width: int) -> str: + if width <= 0: + return '' + if len(text) <= width: + return text + if width <= 3: + return text[:width] + return text[: width - 3] + '...' + + +class VehicleDiagNode(Node): + """ROS2 node: discover vehicles and subscribe to diagnostic topics.""" + + def __init__(self) -> None: + super().__init__('vehicle_diag_show') + self._lock = threading.Lock() + self._vehicles: Dict[str, VehicleDiagState] = {} + self._subs: Dict[str, Dict[str, object]] = {} + self._scan_timer = self.create_timer(RESCAN_SEC, self._rescan_topics) + self._rescan_topics() + + def snapshot(self) -> Dict[str, VehicleDiagState]: + now = time.monotonic() + with self._lock: + for state in self._vehicles.values(): + purge_status_entries(state.status_entries, now) + if state.last_topic_seen_ts > 0: + state.offline = (now - state.last_topic_seen_ts) > OFFLINE_SEC + return {k: state for k, state in self._vehicles.items()} + + def _get_or_create_state(self, sys_key: str) -> VehicleDiagState: + if sys_key not in self._vehicles: + self._vehicles[sys_key] = VehicleDiagState( + sys_key=sys_key, + sysid=sys_key_to_id(sys_key), + ) + return self._vehicles[sys_key] + + def _rescan_topics(self) -> None: + now = time.monotonic() + found: set = set() + for topic_name, _ in self.get_topic_names_and_types(): + match = VEHICLE_TOPIC_RE.match(topic_name) + if not match: + continue + sys_key = match.group(1) + found.add(sys_key) + with self._lock: + state = self._get_or_create_state(sys_key) + state.last_topic_seen_ts = now + state.offline = False + self._ensure_subscriptions(sys_key) + + with self._lock: + for sys_key, state in self._vehicles.items(): + if sys_key not in found and state.last_topic_seen_ts > 0: + state.offline = (now - state.last_topic_seen_ts) > OFFLINE_SEC + + def _ensure_subscriptions(self, sys_key: str) -> None: + if sys_key in self._subs: + return + base = f'/fc_network/vehicle/{sys_key}' + subs = { + 'summary': self.create_subscription( + String, + f'{base}/summary', + lambda msg, sk=sys_key: self._on_summary(sk, msg), + 10, + ), + 'sys_diags': self.create_subscription( + SystemDiagnosticsRaw, + f'{base}/sys_diags', + lambda msg, sk=sys_key: self._on_sys_diags(sk, msg), + 10, + ), + 'status_text': self.create_subscription( + String, + f'{base}/status_text', + lambda msg, sk=sys_key: self._on_status_text(sk, msg), + 10, + ), + } + with self._lock: + self._subs[sys_key] = subs + + def _on_summary(self, sys_key: str, msg: String) -> None: + now = time.monotonic() + try: + summary = json.loads(msg.data) + except json.JSONDecodeError: + summary = {'raw': msg.data} + with self._lock: + state = self._get_or_create_state(sys_key) + state.summary = summary + state.last_summary_ts = now + state.last_topic_seen_ts = now + + def _on_sys_diags(self, sys_key: str, msg: SystemDiagnosticsRaw) -> None: + now = time.monotonic() + with self._lock: + state = self._get_or_create_state(sys_key) + state.diag = msg + state.last_diag_ts = now + state.last_topic_seen_ts = now + + def _on_status_text(self, sys_key: str, msg: String) -> None: + now = time.monotonic() + entry = parse_status_text(msg.data, sys_key, now) + with self._lock: + state = self._get_or_create_state(sys_key) + state.status_entries.append(entry) + purge_status_entries(state.status_entries, now) + state.last_topic_seen_ts = now + + +class DiagCursesApp: + """Curses front-end for vehicle diagnostics.""" + + def __init__(self, node: VehicleDiagNode) -> None: + self._node = node + self._running = True + + def run(self) -> None: + curses.wrapper(self._main) + + def _init_colors(self) -> None: + curses.start_color() + curses.use_default_colors() + curses.init_pair(1, curses.COLOR_GREEN, -1) # OK / DISARMED / INFO + curses.init_pair(2, curses.COLOR_RED, -1) # FAIL / ARMED / ERROR+ + curses.init_pair(3, curses.COLOR_YELLOW, -1) # WARN / OFF + curses.init_pair(4, curses.COLOR_CYAN, -1) # header / INFO + curses.init_pair(5, curses.COLOR_WHITE, -1) # dim / offline + curses.init_pair(6, curses.COLOR_MAGENTA, -1) # NOTICE + + def _main(self, stdscr) -> None: + curses.curs_set(0) + stdscr.nodelay(True) + stdscr.keypad(True) + self._init_colors() + + while self._running and rclpy.ok(): + try: + self._draw(stdscr) + except curses.error: + pass + key = stdscr.getch() + if key in (ord('q'), ord('Q'), 27): + self._running = False + time.sleep(1.0 / DRAW_HZ) + + def _draw(self, stdscr) -> None: + stdscr.erase() + height, width = stdscr.getmaxyx() + now = time.monotonic() + vehicles = self._node.snapshot() + sorted_keys = sorted(vehicles.keys(), key=sys_key_to_id) + + header = f' Vehicle Diagnostics Monitor vehicles:{len(sorted_keys)} Q:quit ' + stdscr.attron(curses.color_pair(4) | curses.A_BOLD) + stdscr.addstr(0, 0, truncate(header.ljust(width - 1), width - 1)) + stdscr.attroff(curses.color_pair(4) | curses.A_BOLD) + + row = 1 + if not sorted_keys: + msg = ' Waiting for vehicles... (need /fc_network/vehicle/sysN/* topics) ' + if row < height - 1: + stdscr.addstr(row, max(0, (width - len(msg)) // 2), truncate(msg, width - 1)) + row += 2 + + vehicle_rows_end = max(row, height - 8) + for sys_key in sorted_keys: + if row >= vehicle_rows_end: + break + state = vehicles[sys_key] + row = self._draw_vehicle_block(stdscr, row, width, state, now) + if row < height - 1: + try: + stdscr.addstr(row, 0, truncate('-' * (width - 1), width - 1)) + except curses.error: + pass + row += 1 + + log_title = ' Status Text (last 30s) ' + if row < height - 1: + stdscr.attron(curses.color_pair(4)) + stdscr.addstr(row, 0, truncate(log_title.ljust(width - 1), width - 1)) + stdscr.attroff(curses.color_pair(4)) + row += 1 + + all_entries: List[StatusTextEntry] = [] + for sys_key in sorted_keys: + all_entries.extend(vehicles[sys_key].status_entries) + all_entries.sort(key=lambda e: e.received_at) + + log_rows = height - row - 1 + if log_rows <= 0: + stdscr.refresh() + return + + if not all_entries: + empty = ' (no messages in last 30s) ' + if row < height - 1: + stdscr.addstr(row, 0, truncate(empty, width - 1)) + else: + visible = all_entries[-log_rows:] + for entry in visible: + if row >= height - 1: + break + self._draw_status_line(stdscr, row, width, entry) + row += 1 + + stdscr.refresh() + + def _draw_vehicle_block( + self, + stdscr, + row: int, + width: int, + state: VehicleDiagState, + now: float, + ) -> int: + summary = state.summary or {} + mode = summary.get('mode_name', 'UNKNOWN') + armed = summary.get('armed', False) + conn = summary.get('connection_type', '?') + socket_id = summary.get('socket_id', -1) + mav_status = summary.get('mav_status', 0) + mav_label = MAV_STATE_LABELS.get(int(mav_status), str(mav_status)) + + last_update = max(state.last_summary_ts, state.last_diag_ts) + age_sec = (now - last_update) if last_update > 0 else -1.0 + stale = age_sec >= 0 and age_sec > STALE_SEC + + armed_str = 'ARMED' if armed else 'DISARMED' + flags = [] + if state.offline: + flags.append('OFFLINE') + if stale and not state.offline: + flags.append('STALE') + flag_str = f" [{' '.join(flags)}]" if flags else '' + + line1 = ( + f' {state.sys_key} (MAV {state.sysid}) {mode} {armed_str} ' + f'{conn} socket:{socket_id} {mav_label} ' + f'updated {format_age(age_sec)}{flag_str}' + ) + if row < stdscr.getmaxyx()[0] - 1: + if state.offline: + stdscr.attron(curses.color_pair(5)) + stdscr.addstr(row, 0, truncate(line1, width - 1)) + stdscr.attroff(curses.color_pair(5)) + else: + stdscr.addstr(row, 0, truncate(line1, width - 1)) + armed_pos = line1.find(armed_str) + if armed_pos >= 0: + pair = curses.color_pair(2) if armed else curses.color_pair(1) + stdscr.attron(pair | curses.A_BOLD) + stdscr.addstr(row, armed_pos, armed_str) + stdscr.attroff(pair | curses.A_BOLD) + row += 1 + + if state.diag is not None and row < stdscr.getmaxyx()[0] - 1: + load_pct = state.diag.mcu_load / 10.0 + drop_pct = state.diag.bus_error_rate / 10.0 + line2 = f' MCU {load_pct:.1f}% CommDrop {drop_pct:.1f}%' + stdscr.addstr(row, 0, truncate(line2, width - 1)) + row += 1 + + sensors = decode_sensors( + state.diag.sensors_install_mask, + state.diag.sensors_enabled_mask, + state.diag.sensors_health_mask, + ) + row = self._draw_sensor_line(stdscr, row, width, sensors) + elif row < stdscr.getmaxyx()[0] - 1: + stdscr.addstr(row, 0, truncate(' (no sys_diags yet)', width - 1)) + row += 1 + + return row + + def _draw_sensor_line( + self, + stdscr, + row: int, + width: int, + sensors: List[Tuple[str, str]], + ) -> int: + max_row, _ = stdscr.getmaxyx() + if not sensors: + if row < max_row - 1: + stdscr.addstr(row, 0, truncate(' Sensors: (none installed)', width - 1)) + return row + 1 + + lines: List[List[Tuple[str, str]]] = [[]] + line_len = len(' Sensors: ') + for label, status in sensors: + token = f'{label}:{status} ' + if lines[-1] and line_len + len(token) >= width - 1: + lines.append([]) + line_len = 2 + lines[-1].append((label, status)) + line_len += len(token) + + for idx, line_items in enumerate(lines): + if row >= max_row - 1: + break + prefix = ' Sensors: ' if idx == 0 else ' ' + col = 0 + stdscr.addstr(row, col, prefix) + col += len(prefix) + for label, status in line_items: + token = f'{label}:{status} ' + if col + len(token) >= width - 1: + stdscr.addstr(row, col, truncate('...', width - col - 1)) + return row + 1 + pair = curses.A_NORMAL + if status == 'OK': + pair = curses.color_pair(1) + elif status == 'FAIL': + pair = curses.color_pair(2) | curses.A_BOLD + elif status == 'OFF': + pair = curses.color_pair(3) + stdscr.attron(pair) + stdscr.addstr(row, col, token) + stdscr.attroff(pair) + col += len(token) + row += 1 + return row + + def _draw_status_line( + self, + stdscr, + row: int, + width: int, + entry: StatusTextEntry, + ) -> None: + sev_label = SEVERITY_LABELS.get(entry.severity, str(entry.severity)) + prefix = f'[{entry.sys_key:5}] [{entry.severity} {sev_label:5}] ' + body = truncate(entry.text, max(0, width - len(prefix) - 2)) + line = prefix + body + + pair = curses.A_NORMAL + if entry.severity <= 3: + pair = curses.color_pair(2) + elif entry.severity == 4: + pair = curses.color_pair(3) + elif entry.severity == 5: + pair = curses.color_pair(6) + elif entry.severity == 6: + pair = curses.color_pair(1) + elif entry.severity == 7: + pair = curses.color_pair(5) + + stdscr.attron(pair) + stdscr.addstr(row, 0, truncate(line, width - 1)) + stdscr.attroff(pair) + + +def main() -> int: + if not sys.stdout.isatty(): + print('vehicleDiagShow requires an interactive terminal (TTY).', file=sys.stderr) + return 1 + + rclpy.init() + node = VehicleDiagNode() + spin_thread = threading.Thread(target=rclpy.spin, args=(node,), daemon=True) + spin_thread.start() + + try: + DiagCursesApp(node).run() + finally: + node.destroy_node() + rclpy.shutdown() + spin_thread.join(timeout=2.0) + + return 0 + + +if __name__ == '__main__': + sys.exit(main())