(update) ntrip_client.py

Introduced threading locks and stop event for safer socket management. Updated the receive loop to handle connection interruptions.
Chiyu Chen 2 weeks ago
parent 2c7f2afc45
commit cc9d005392

@ -2,6 +2,7 @@ import socket
import base64 import base64
import threading import threading
import time import time
from typing import Optional, Tuple
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
@ -21,7 +22,7 @@ class GGA_stream():
return f"{value:02X}" return f"{value:02X}"
@classmethod @classmethod
def decimal_to_nmea_dm(cls, deg: float, *, is_latitude: bool) -> tuple[str, str]: def decimal_to_nmea_dm(cls, deg: float, *, is_latitude: bool) -> Tuple[str, str]:
"""十進位度 → NMEA 的 (d)dmm.mmmm 與半球字元。""" """十進位度 → NMEA 的 (d)dmm.mmmm 與半球字元。"""
if is_latitude: if is_latitude:
hemi = "N" if deg >= 0 else "S" hemi = "N" if deg >= 0 else "S"
@ -82,7 +83,7 @@ class NtripClientNode(Node):
self.declare_parameter('gga_lat', None) self.declare_parameter('gga_lat', None)
self.declare_parameter('gga_lon', None) self.declare_parameter('gga_lon', None)
self.declare_parameter('gga_alt_m', 100) self.declare_parameter('gga_alt_m', 100)
self.declare_parameter('gga_interval_sec', 300) self.declare_parameter('gga_interval_sec', 60)
rtcm_qos = QoSProfile( rtcm_qos = QoSProfile(
history=HistoryPolicy.KEEP_LAST, history=HistoryPolicy.KEEP_LAST,
@ -92,7 +93,9 @@ class NtripClientNode(Node):
) )
self.publisher_ = self.create_publisher(RTCM, '/fc_network/rtcm_input', rtcm_qos) self.publisher_ = self.create_publisher(RTCM, '/fc_network/rtcm_input', rtcm_qos)
self._running = True self._stop_event = threading.Event()
self._sock_lock = threading.Lock()
self._active_sock: Optional[socket.socket] = None
self._thread = threading.Thread(target=self._receive_loop, daemon=True) self._thread = threading.Thread(target=self._receive_loop, daemon=True)
self._thread.start() self._thread.start()
@ -100,6 +103,15 @@ class NtripClientNode(Node):
# ── NTRIP 連線 + RTCM 接收迴圈 ────────────────────────────── # ── NTRIP 連線 + RTCM 接收迴圈 ──────────────────────────────
def _request_stop(self):
self._stop_event.set()
with self._sock_lock:
if self._active_sock is not None:
try:
self._active_sock.close()
except OSError:
pass
def _receive_loop(self): def _receive_loop(self):
backoff = self.RECONNECT_BASE_SEC backoff = self.RECONNECT_BASE_SEC
@ -113,10 +125,12 @@ class NtripClientNode(Node):
self.get_logger().error('mountpoint 參數未設定...') self.get_logger().error('mountpoint 參數未設定...')
return return
while self._running: while not self._stop_event.is_set():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10) sock.settimeout(10)
with self._sock_lock:
self._active_sock = sock
self.gga_lat = self.get_parameter('gga_lat').value self.gga_lat = self.get_parameter('gga_lat').value
self.gga_lon = self.get_parameter('gga_lon').value self.gga_lon = self.get_parameter('gga_lon').value
@ -154,18 +168,26 @@ class NtripClientNode(Node):
self._read_stream(sock) self._read_stream(sock)
except Exception as e: except Exception as e:
if self._running: if self._stop_event.is_set():
break
self.get_logger().warn(f'連線中斷: {e}{backoff:.0f}s 後重連') self.get_logger().warn(f'連線中斷: {e}{backoff:.0f}s 後重連')
time.sleep(backoff) if self._stop_event.wait(timeout=backoff):
break
backoff = min(backoff * 2, self.RECONNECT_MAX_SEC) backoff = min(backoff * 2, self.RECONNECT_MAX_SEC)
finally: finally:
with self._sock_lock:
if self._active_sock is sock:
self._active_sock = None
try:
sock.close() sock.close()
except OSError:
pass
def _read_stream(self, sock: socket.socket): def _read_stream(self, sock: socket.socket):
buf = b'' buf = b''
last_gga_time = 0.0 last_gga_time = 0.0
while self._running: while not self._stop_event.is_set():
now = time.time() now = time.time()
if self.gga_on and (now - last_gga_time >= self.gga_interval_sec): if self.gga_on and (now - last_gga_time >= self.gga_interval_sec):
line = GGA_stream.send_gga(sock, self.gga_lat, self.gga_lon, self.gga_alt_m) line = GGA_stream.send_gga(sock, self.gga_lat, self.gga_lon, self.gga_alt_m)
@ -175,8 +197,16 @@ class NtripClientNode(Node):
try: try:
chunk = sock.recv(4096) chunk = sock.recv(4096)
except socket.timeout: except socket.timeout:
if self._stop_event.is_set():
break
continue continue
except OSError:
if self._stop_event.is_set():
break
raise
if not chunk: if not chunk:
if self._stop_event.is_set():
break
raise ConnectionError("stream ended. Empty Chunk") raise ConnectionError("stream ended. Empty Chunk")
buf += chunk buf += chunk
@ -203,8 +233,11 @@ class NtripClientNode(Node):
self.publisher_.publish(msg) self.publisher_.publish(msg)
def destroy_node(self): def destroy_node(self):
self._running = False self._request_stop()
self._thread.join(timeout=3.0) if self._thread.is_alive():
self._thread.join(timeout=5.0)
if self._thread.is_alive():
self.get_logger().warn('receive thread 未在時限內結束')
super().destroy_node() super().destroy_node()

@ -34,6 +34,8 @@ python -m fc_network_module.fc_network_module.ntrip_client --ros-args -p mountpo
python -m fc_network_module.fc_network_module.ntrip_client --ros-args -p host:=210.241.63.193 -p port:=81 -p mountpoint:=2020_GNSS -p username:=uavlab6061 -p password:=iamsupersmart python -m fc_network_module.fc_network_module.ntrip_client --ros-args -p host:=210.241.63.193 -p port:=81 -p mountpoint:=2020_GNSS -p username:=uavlab6061 -p password:=iamsupersmart
python3 -m fc_network_module.fc_network_module.ntrip_client --ros-args -p host:=210.241.63.193 -p port:=81 -p mountpoint:=2020_GNSS -p username:=uavlab6061 -p password:=iamsupersmart -p gga_on:=True -p gga_lat:=24.155792000 -p gga_lon:=120.630679000 -p gga_interval_sec:=60
export ROS_DOMAIN_ID=13 export ROS_DOMAIN_ID=13
ros2 daemon stop ros2 daemon stop
ros2 daemon start ros2 daemon start

Loading…
Cancel
Save