(temp) 整理檔案與註解

chiyu
Chiyu Chen 3 weeks ago
parent b0f1bd56f5
commit 7ce094d211

@ -1,7 +1,5 @@
這是天雷系統的專案
===
## 功能簡介
1. mavlink 多對多支援平台

@ -38,7 +38,7 @@ class SerialMode(Enum):
NOT_USE = auto() # 不使用
# ====================== Frame Processor 基類與實現 =====================
# ====================== Frame Processor Base and Implementation =====================
class FrameProcessor(ABC):
"""協議處理器基類"""
@ -123,11 +123,9 @@ class XBeeFrameProcessor(FrameProcessor):
def process_outgoing(self, data: bytes) -> bytes:
"""將數據封裝為 XBee API 傳輸幀"""
return XBeeFrameProcessor.encapsulate_data(data)
return XBeeFrameHandler.encapsulate_data(data)
# ====================== XBee Frame Handler =====================
class XBeeFrameHandler:
"""XBee API Frame 處理器"""
@ -200,6 +198,10 @@ class XBeeFrameHandler:
return data[rf_data_start:3 + length]
# ====================== Dongle Command Handler =====================
class ATCommandHandler:
"""AT 指令回應處理器"""
@ -240,9 +242,7 @@ class ATCommandHandler:
"""處理 SL (Serial Number Low)"""
pass
# ====================== Serial Handler =====================
# ================ Serial UDP Socket Object ==============
class SerialHandler(asyncio.Protocol):
"""asyncio.Protocol 用於處理 Serial 收發"""
@ -291,8 +291,6 @@ class SerialHandler(asyncio.Protocol):
)
# ====================== UDP Handler =====================
class UDPHandler(asyncio.DatagramProtocol):
"""asyncio.DatagramProtocol 用於處理 UDP 收發"""
@ -600,7 +598,7 @@ if __name__ == '__main__':
SERIAL_PORT = '/dev/ttyUSB0' # 手動指定
SERIAL_BAUDRATE = 115200
UDP_REMOTE_PORT = 14571
UDP_REMOTE_PORT = 14561
sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT)
# SERIAL_PORT = '/dev/ttyACM0' # 手動指定

@ -1,45 +0,0 @@
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import time
# import mavros_msgs.srv
class TalkerNode(Node):
def __init__(self):
start_time = time.time()
super().__init__('talker_node')
end_time = time.time()
print(f"Node initialization took {end_time - start_time:.2f} seconds")
self.publisher_ = self.create_publisher(String, 'hahatest/_1', 10)
self.timer = self.create_timer(1.0, self.timer_callback) # 每秒執行一次
self.get_logger().info('TalkerNode has been started.')
def timer_callback(self):
msg = String()
msg.data = 'Hello, ROS 2!'
self.publisher_.publish(msg)
self.get_logger().info(f'Published: "{msg.data}"')
def main(args=None):
rclpy.init(args=args)
node = TalkerNode()
print("Before sleep")
time.sleep(5) # 等待 5 秒鐘
print("After sleep")
try:
start_time = time.time()
while time.time() - start_time < 10: # 持續 10 秒鐘
rclpy.spin_once(node)
time.sleep(1) # 每秒執行一次
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

@ -0,0 +1,188 @@
import time
from pymavlink import mavutil
# # 強制 source_system=255
# master = mavutil.mavlink_connection('udpout:localhost:14561', source_system=255)
# print("Starting Force-Ping Test...")
# while True:
# # 1. 發送 Heartbeat (證明連線活著)
# master.mav.heartbeat_send(mavutil.mavlink.MAV_TYPE_GCS,
# mavutil.mavlink.MAV_AUTOPILOT_INVALID, 0, 0, 0)
# # 2. 強制發送 PING (target 1, 1)
# # 我們加一個計數器,方便在 tcpdump 裡辨識
# ping_seq = int(time.time()) % 100
# master.mav.ping_send(int(time.time() * 1e6), ping_seq, 1, 1)
# print(f"Sent Heartbeat and PING (seq {ping_seq})")
# # 3. 非阻塞讀取,避免卡死
# msg = master.recv_match(blocking=False)
# if msg:
# print(f"--> Received something: {msg.get_type()}")
# time.sleep(1) # 每秒噴一次
# -------------------------------------------------------------------
# # 改成 udpin這會讓你的程式守在 14561 等模擬器送資料進來
# # 這樣你就能收到模擬器的 Heartbeat建立連線後再發 PING
# master = mavutil.mavlink_connection('udpin:localhost:14561', source_system=255)
# print("Waiting for simulator heartbeat...")
# # 這一行很重要:先收到 Heartbeatpymavlink 才會設定好 target 地址
# master.wait_heartbeat()
# print(f"Heartbeat from system {master.target_system} component {master.target_component}")
# while True:
# # 既然收到了,現在發 PING 就會順著原路回去
# master.mav.ping_send(int(time.time() * 1e6), 1, master.target_system, master.target_component)
# print("PING sent!")
# msg = master.recv_match(type='PING', blocking=True, timeout=2)
# if msg:
# print(f"Got PING response: {msg}")
# time.sleep(1)
# -------------------------------------------------------------------
# import time
# from pymavlink import mavutil
# # 核心修改:因為 MAVProxy 正在往 14561 噴資料,我們必須「接住」它
# # 使用 udpin 會讓你的程式在 14561 監聽
# master = mavutil.mavlink_connection('udpin:localhost:14561', source_system=254)
# print("正在等待 MAVProxy 轉發的 Heartbeat...")
# # 只要這行跑過,代表你跟 MAVProxy 握手成功了
# master.wait_heartbeat()
# print(f"成功連線到系統 {master.target_system}")
# while True:
# # 1. 務必發送 Heartbeat讓 MAVProxy 知道要把回應送回這個 Port
# master.mav.heartbeat_send(
# mavutil.mavlink.MAV_TYPE_GCS,
# mavutil.mavlink.MAV_AUTOPILOT_INVALID,
# 0, 0, 0
# )
# # 2. 修改 target_component 為 1 (飛控核心)
# # 注意:這裡刻意將 seq 設為動態,方便你在 watch PING 裡觀察
# p_seq = int(time.time()) % 255
# master.mav.ping_send(
# int(time.time() * 1e6),
# p_seq,
# 1,
# 1 # <--- 改成 1 試試看!
# )
# print(f"Sent PING seq {p_seq} to 1/1")
# # 3. 讀取所有訊息,看看有沒有 PING 回來
# # 有時候回應的 target_system 可能是 255 (你的 ID)
# msg = master.recv_match(type='PING', blocking=True, timeout=1.0)
# if msg:
# print(f"🎉 成功!收到回應: {msg}")
# time.sleep(1)
# -------------------------------------------------------------------
# import time
# from pymavlink import mavutil
# # 建議一樣用 udpin 監聽 MAVProxy 的輸出
# master = mavutil.mavlink_connection('udpin:localhost:14561', source_system=255)
# def get_time_ns():
# return int(time.time() * 1e9) # 轉為奈秒
# print("開始測試 TIMESYNC 響應...")
# while True:
# now_ns = get_time_ns()
# # 發送 TIMESYNC 請求
# # tc1=0 代表這是請求ts1=當前時間
# master.mav.timesync_send(0, now_ns)
# # 等待回應
# msg = master.recv_match(type='TIMESYNC', blocking=True, timeout=1.0)
# if msg and msg.tc1 != 0:
# # 收到回應msg.tc1 應該等於我們剛才送出的 now_ns
# rtt_ms = (get_time_ns() - msg.tc1) / 1e6
# print(f"🕒 TIMESYNC RTT: {rtt_ms:.3f} ms")
# break # 測試一次就好
# else:
# print("等待 TIMESYNC 回應中...")
# time.sleep(1)
# -------------------------------------------------------------------
import time
import statistics
from pymavlink import mavutil
# 連接到 MAVProxy 轉發的端口
master = mavutil.mavlink_connection('udpin:localhost:14561', source_system=255)
# 測試設定
TARGET_SYS = 2 # 你想測試的載具 ID
SAMPLES = 20 # 取 20 次樣本來算平均和抖動
rtt_history = []
print(f"開始測試對 System {TARGET_SYS} 的通道品質...")
def get_time_ns():
return int(time.time() * 1e9)
try:
while len(rtt_history) < SAMPLES:
ts1_send = get_time_ns()
# 發送 TIMESYNC (V1/V2 格式相容)
# 針對特定系統發送
master.mav.timesync_send(0, ts1_send)
# 等待回應 (過濾目標系統的訊息)
msg = master.recv_match(type='TIMESYNC', blocking=True, timeout=1.0)
if msg and msg.get_srcSystem() == TARGET_SYS and msg.tc1 != 0:
ts1_recv = get_time_ns()
rtt_ns = ts1_recv - msg.tc1
rtt_ms = rtt_ns / 1e6
rtt_history.append(rtt_ms)
print(f"Sample {len(rtt_history)}: RTT = {rtt_ms:.3f} ms")
else:
print("Request timeout or mismatched ID...")
time.sleep(0.1) # 快速採樣
# 4. 計算統計數據
avg_latency = statistics.mean(rtt_history)
std_dev = statistics.stdev(rtt_history) # 這就是 Jitter 的一種表現
max_rtt = max(rtt_history)
min_rtt = min(rtt_history)
print("\n--- 物理通道品質分析報告 ---")
print(f"目標載具 : System {TARGET_SYS}")
print(f"平均延遲 (Avg) : {avg_latency:.3f} ms")
print(f"最小延遲 (Min) : {min_rtt:.3f} ms")
print(f"最大延遲 (Max) : {max_rtt:.3f} ms")
print(f"抖動幅度 (Jitter): {std_dev:.3f} ms (標準差)")
if std_dev > 10:
print("警告:網路抖動過高,可能影響控制穩定性!")
else:
print("通道品質:良好")
except KeyboardInterrupt:
print("測試終止。")

@ -0,0 +1,46 @@
from pymavlink.dialects.v20 import common as mavlink
class NullBuffer:
def write(self, data): pass
def seek(self, pos): pass
def tell(self): return 0
class CapturingBuffer:
def __init__(self):
self.last_data = b''
def write(self, data):
self.last_data = bytes(data)
def seek(self, pos): pass
def tell(self): return 0
# 1. 初始化(僅作為編碼器)
# file 參數可以是任何擁有 write() 方法的物件,這裡用 BytesIO 模擬
# 初始化方法一:使用 BytesIO
# import io
# out_buf = io.BytesIO()
# mav = mavlink.MAVLink(out_buf, srcSystem=1, srcComponent=191)
# 初始化方法二:使用自定義的 NullBuffer
mav = mavlink.MAVLink(NullBuffer(), srcSystem=1, srcComponent=191)
mav.seq = 254
# 2. 建立心跳包並取得二進制數據
# 連續產出「不同」的訊息物件
for i in range(3):
msg = mav.heartbeat_encode(mavlink.MAV_TYPE_GCS, 0, 0, 0, 0)
data = msg.pack(mav)
mav.seq = (mav.seq + 1) & 0xFF
# MAVLink 2 的序列號在第 3 個 Byte (Index 2)
print(f"{i} 次發送, Seq: {data[4]}, Raw: {data.hex()}")
print("分隔線")
buf = CapturingBuffer()
mav_buf2 = mavlink.MAVLink(buf, srcSystem=1, srcComponent=191)
for i in range(3):
mav_buf2.heartbeat_send(mavlink.MAV_TYPE_GCS, 0, 0, 0, 0)
data = buf.last_data
print(f"{i} 次發送, Seq: {data[4]}, Raw: {data.hex()}")

File diff suppressed because it is too large Load Diff

@ -0,0 +1,71 @@
import asyncio
import serial_asyncio
SERIAL_PORT = '/dev/ttyUSB0' # 修改為你的 serial port
SERIAL_BAUDRATE = 115200 # 修改為你的 baudrate
UDP_REMOTE_IP = '192.168.1.100' # 修改為目標 IP
UDP_REMOTE_PORT = 5005 # 修改為目標 port
UDP_LOCAL_PORT = 5006 # 本地 UDP 監聽 port
class SerialToUDP(asyncio.Protocol):
def __init__(self, udp_transport):
self.udp_transport = udp_transport
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
# Serial 收到資料,轉發到 UDP
self.udp_transport.sendto(data, (UDP_REMOTE_IP, UDP_REMOTE_PORT))
def write_to_serial(self, data):
self.transport.write(data)
class UDPToSerial(asyncio.DatagramProtocol):
def __init__(self, serial_proto):
self.serial_proto = serial_proto
def datagram_received(self, data, addr):
# UDP 收到資料,轉發到 Serial
self.serial_proto.write_to_serial(data)
async def main():
loop = asyncio.get_running_loop()
# 定義協議工廠函數
def create_empty_protocol():
return asyncio.DatagramProtocol()
# 建立 UDP1 傳輸
udp_transport, _ = await loop.create_datagram_endpoint(
create_empty_protocol,
local_addr=('0.0.0.0', UDP_LOCAL_PORT)
)
# 建立 Serial 傳輸
serial_proto = SerialToUDP(udp_transport)
def get_serial_protocol():
return serial_proto
_, serial_transport = await serial_asyncio.create_serial_connection(
loop, get_serial_protocol, SERIAL_PORT, baudrate=SERIAL_BAUDRATE
)
# 建立 UDP2 監聽
udp_proto = UDPToSerial(serial_proto)
def get_udp_protocol():
return udp_proto
udp_listen_transport, _ = await loop.create_datagram_endpoint(
get_udp_protocol,
local_addr=('0.0.0.0', UDP_LOCAL_PORT)
)
# 保持運行
try:
await asyncio.Future()
except asyncio.CancelledError:
pass
if __name__ == '__main__':
asyncio.run(main())

@ -1,27 +0,0 @@
#!/bin/bash
# 網站清單
DOMAINS=("google.com" "smarter.nchu.edu.tw")
echo "網站 SSL 憑證剩餘天數:"
echo "---------------------------"
for domain in "${DOMAINS[@]}"; do
end_date=$(echo | openssl s_client -servername "$domain" -connect "$domain:443" 2>/dev/null |
openssl x509 -noout -enddate | cut -d= -f2)
end_timestamp=$(date -d "$end_date" +%s)
now_timestamp=$(date +%s)
remaining_days=$(( (end_timestamp - now_timestamp) / 86400 ))
if [ $remaining_days -lt 0 ]; then
status="已過期 ❌"
elif [ $remaining_days -lt 15 ]; then
status="即將到期 ⚠️"
else
status="正常 ✅"
fi
printf "%-20s 到期日:%-25s 剩餘天數:%3d 天 %s\n" "$domain" "$end_date" "$remaining_days" "$status"
done

@ -0,0 +1,40 @@
import socket
import sys
import select
# 設定來源 IP 與 Port
SRC_IP = '127.0.0.1' # 監聽所有介面
SRC_PORT = 16661 # 請自行修改
# 建立 UDP 監聽 socket
src_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
src_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
src_sock.bind((SRC_IP, SRC_PORT))
print(f"Listening for UDP on {SRC_IP}:{SRC_PORT}")
# 設定目標 Unix socket 路徑
UNIX_SOCKET_PATH = '/tmp/unix_socket_mavlink.sock' # 請自行修改
# 建立 Unix socket 連線
unix_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
unix_sock.connect(UNIX_SOCKET_PATH)
except Exception as e:
print(f"Failed to connect to unix socket: {e}")
sys.exit(1)
while True:
# 使用 select 監聽兩個 socket
readable, _, _ = select.select([src_sock, unix_sock], [], [])
for sock in readable:
if sock is src_sock:
data, addr = src_sock.recvfrom(4096)
if data:
unix_sock.sendall(data)
# print(f"Received UDP data from {addr}: {data}") # debug
# break # debug
elif sock is unix_sock:
data = unix_sock.recv(4096)
if data:
# 回送到最近收到資料的 UDP client
src_sock.sendto(data, addr)
Loading…
Cancel
Save