1. 新增 serial_udp_bitrans.py 該程式連結 serial 的封包 經過轉換後 再向 udp 口丟出 (反之亦然)

2. 其餘檔案只是 名稱與註解修正
chiyu
Chiyu Chen 12 months ago
parent 5769f9ab3b
commit 417d9e8f57

@ -11,7 +11,8 @@
Python
1. pymavlink
2.
2. conda-forge 中的 pyserial-asyncio
3.
ROS2
1. source ~/ros2_humble/install/setup.bash

@ -31,6 +31,11 @@ class mavlink_device():
# p_str += '=====================\n'
return p_str
'''
寫了半天 這個功能應該是 pymalink 本來就有的
去找 pymavlink_util.py mavfile(object)
算了 先擺著吧 之後再看看怎麼整合
'''
def updateComponentPacketCount(self, compid, current_seq, current_type, current_time):
# 這段檢查遺失封包
try:

@ -345,7 +345,6 @@ class mavlink_object():
return_packet_processor_queue.put((self.socket_id,timestamp,mavlinkMsg))
else:
_queue = swap_queues[i-2]
# _queue.put((self.socket_id,timestamp,mavlinkMsg)) # 測試看看 也許不需要別的資訊 只需要封包
_queue.put(mavlinkMsg)
# 處理要送出的封包

@ -0,0 +1,340 @@
import asyncio
import serial_asyncio
# 附加驗證功能
import os
import sys
import serial
import signal
# XBee 模組
from xbee.frame import APIFrame
# =========================
SERIAL_PORT = '/dev/ttyACM0' # serial port
SERIAL_BAUDRATE = 57600 # serial baudrate
UDP_REMOTE_IP = '127.0.0.1' # UDP Target IP
UDP_REMOTE_PORT = 14550 # UDP Target port
# 測試用 只會吃一次資料
DEBUG_MODE = False
# =========================
def check_serial_port():
"""檢查串口是否存在與可用"""
# 檢查設備是否存在
if not os.path.exists(SERIAL_PORT):
print(f"錯誤:串口設備 {SERIAL_PORT} 不存在")
return False
# 檢查是否有權限訪問設備
try:
os.access(SERIAL_PORT, os.R_OK | os.W_OK)
except Exception as e:
print(f"錯誤:無法訪問串口設備 {SERIAL_PORT}{str(e)}")
return False
# 檢查是否被占用
try:
# 嘗試打開串口
ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE)
ser.close() # 打開成功後立即關閉
return True
except serial.SerialException as e:
print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}")
return False
except Exception as e:
print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}")
return False
# =========================
class SerialToUDP(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial 收發
def __init__(self, udp_protocol):
self.udp_protocol = udp_protocol
self.first_data = True # 標記是否為第一次收到資料
self.has_processed = False # 測試模式用 處理數據旗標 # debug
def connection_made(self, transport):
self.transport = transport
if hasattr(self.udp_protocol, 'set_serial_transport'):
self.udp_protocol.set_serial_transport(self)
print(f"Serial connection established on {SERIAL_PORT}")
## =====================================
# Serial 收到資料,轉發到 UDP
def data_received(self, data):
# 在 DEBUG 模式下,如果已經處理過數據,則直接返回 # debug
if DEBUG_MODE and self.has_processed:
return
# 標記首次收到資料
if hasattr(self.udp_protocol, 'send_udp'):
if self.first_data:
print(f"First data received from serial, forwarding to UDP: {data[:20]}...")
self.first_data = False
# 轉發數據
self.udp_protocol.send_udp(data)
if DEBUG_MODE: # 測試模式用 # debug
self.has_processed = True
print("[DEBUG] SerialToUDP Mark")
def write_to_serial(self, data):
# 在資料透過 Serial 發送之前進行處理
processed_data = self.encapsulate_data(data)
# 處理失敗就不發送
if processed_data not None:
self.transport.write(processed_data)
def encapsulate_data(self, data):
"""
將數據封裝為 XBee API 傳輸幀
使用 XBee API 格式封裝數據:
- 傳輸請求幀 (0x10)
- 使用廣播地址
- 添加適當的頭部和校驗和
"""
## 方法一
# 設置幀參數
frame_id = 0x01 # 幀識別碼,用於確認
frame_type = 0x10 # 幀類型:傳輸請求
dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 64位目標地址 (廣播)
dest_addr16 = b'\xFF\xFE' # 16位目標地址 (未知/廣播)
broadcast_radius = 0x00 # 廣播半徑 (0 = 最大)
options = 0x00 # 傳輸選項
# 構建幀數據部分
frame_data = bytearray()
frame_data.append(frame_type) # 添加幀類型
frame_data.append(frame_id) # 添加幀 ID
frame_data.extend(dest_addr64) # 添加 64 位目標地址
frame_data.extend(dest_addr16) # 添加 16 位目標地址
frame_data.append(broadcast_radius) # 添加廣播半徑
frame_data.append(options) # 添加選項
frame_data.extend(data) # 添加實際數據內容
# 計算校驗和 (0xFF 減去所有字節的總和的最低 8 位)
checksum = 0xFF - (sum(frame_data) & 0xFF)
# 構建完整的 API 幀
# 起始分隔符 + 長度 (兩字節) + 幀數據 + 校驗和
complete_frame = bytearray()
complete_frame.append(0x7E) # 添加起始分隔符
complete_frame.extend(struct.pack(">H", len(frame_data))) # 添加長度 (高位優先)
complete_frame.extend(frame_data) # 添加幀數據
complete_frame.append(checksum) # 添加校驗和
return bytes(complete_frame)
## 方法二
# frame_id=0x01
# frame_type = 0x10
# dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播
# dest_addr16 = b'\xFF\xFE'
# broadcast_radius = 0x00
# options = 0x00
# frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
# frame += dest_addr64 + dest_addr16
# frame += struct.pack(">BB", broadcast_radius, options) + data
# checksum = 0xFF - (sum(frame) & 0xFF)
# return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處理 UDP 收發
def __init__(self):
self.serial_transport = None
self.transport = None
self.remote_addr = None # 儲存動態獲取的遠程地址
self.has_processed = False # 測試模式用 處理數據旗標 # debug
def connection_made(self, transport):
self.transport = transport
print("UDP transport ready. Waiting for serial data before sending...")
def set_serial_transport(self, serial_transport):
self.serial_transport = serial_transport
## =====================================
# UDP 收到資料
def datagram_received(self, data, addr):
# 儲存對方的地址(這樣就能向同一個來源回傳數據)
# self.remote_addr = addr
# print(f"Received UDP data from {addr}, setting as remote address")
# 在 DEBUG 模式下,如果已經處理過數據,則直接返回
if DEBUG_MODE and self.has_processed:
return
if self.serial_transport:
self.serial_transport.write_to_serial(data)
if DEBUG_MODE: # 測試模式用
self.has_processed = True
print("[DEBUG] UDPHandler Mark")
def send_udp(self, data):
# 發送資料到 UDP
# if self.transport:
# # 如果有已知的回應地址,使用該地址
# if self.remote_addr:
# self.transport.sendto(data, self.remote_addr)
# # print(f"Sending to dynamic address: {self.remote_addr}")
# else:
# # 否則使用預設地址
# self.transport.sendto(data, (UDP_REMOTE_IP, UDP_REMOTE_PORT))
# print(f"Sending first UDP packet to: {UDP_REMOTE_IP}:{UDP_REMOTE_PORT}")
if self.transport:
# 只有第一次或地址改變時才顯示
# if not hasattr(self, '_last_sent_addr') or self._last_sent_addr != (UDP_REMOTE_IP, UDP_REMOTE_PORT):
# print(f"Sending UDP packet to: {UDP_REMOTE_IP}:{UDP_REMOTE_PORT}")
# self._last_sent_addr = (UDP_REMOTE_IP, UDP_REMOTE_PORT)
# 在透過 UDP 發送數據之前進行解封裝
decoded_data = self.decapsulate_data(data)
self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT))
def decapsulate_data(self, data):
# 這裡可以根據需要進行數據解封裝
## 方法一
try:
# 創建一個 API 幀處理器
api_frame = APIFrame(escaped=True)
# 嘗試解析數據
api_frame.fill(data)
# 如果數據不完整,直接返回原始數據
if not api_frame.is_complete():
return data
# 解析幀內容
parsed_data = api_frame.parse()
# 提取有用數據
if parsed_data:
frame_data = parsed_data.get('rf_data', None)
if frame_data:
return frame_data
return data
## 方法二 - 手動解析
# try:
# # XBee API 幀格式:
# # 起始分隔符(1字節) + 長度(2字節) + API標識符(1字節) + 數據 + 校驗和(1字節)
# # 檢查幀起始符 (0x7E)
# if not data or len(data) < 5 or data[0] != 0x7E:
# return data
# # 獲取數據長度 (不包括校驗和)
# length = (data[1] << 8) + data[2]
# # 檢查幀完整性
# if len(data) < length + 4: # 起始符 + 長度(2字節) + 數據 + 校驗和
# return data
# # 提取API標識符和數據
# frame_type = data[3]
# frame_data = data[4:4+length-1] # 減1是因為API標識符已經算在長度中
# # 根據不同的幀類型進行處理
# if frame_type == 0x90: # 例如,這是"接收數據包"類型
# # 提取實際有效載荷
# # 對於接收數據包,格式通常是:
# # API標識符(1) + 64位地址(8) + 16位地址(2) + 選項(1) + 數據
# if len(frame_data) >= 12: # 確保數據長度足夠
# payload = frame_data[11:] # 前11字節是地址和選項
# return payload
# return data
# 如果無法提取 則回傳 None
except Exception as e:
print(f"手動解析 XBee 數據錯誤: {e}")
return None
async def main():
# 先檢查串口是否可用
if not check_serial_port():
print("程式終止:串口檢查失敗")
return
loop = asyncio.get_running_loop()
# 設置終止處理
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(
sig,
lambda: asyncio.create_task(shutdown(loop))
)
# 建立單一 UDP 端點處理收發
udp_handler = UDPHandler()
# 建立 UDP 傳輸,不指定接收端口,讓系統自動分配
try:
udp_transport, protocol = await loop.create_datagram_endpoint(
lambda: udp_handler,
local_addr=('0.0.0.0', 0) # 使用端口 0 讓系統自動分配可用端口
)
except Exception as e:
print(f"無法創建 UDP 端點:{str(e)}")
return
# 獲取系統分配的本地端口
sock = udp_transport.get_extra_info('socket')
local_addr = sock.getsockname()
print(f"UDP listening on {local_addr[0]}:{local_addr[1]}")
# 建立 Serial 傳輸,將 UDP 處理器傳給它
try:
serial_proto = SerialToUDP(udp_handler)
_, serial_transport = await serial_asyncio.create_serial_connection(
loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE
)
except Exception as e:
print(f"無法建立串口連接:{str(e)}")
udp_transport.close()
return
print(f"Waiting for data from serial port to initiate UDP communication...")
# 保持運行
try:
await asyncio.Future()
except asyncio.CancelledError:
pass
async def shutdown(loop):
"""關閉程序"""
print("Shutting down...")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print("程式被使用者中斷")
except Exception as e:
print(f"程式執行錯誤:{str(e)}")

@ -1,33 +0,0 @@
import socket
import os
# socket file path
SOCKET_PATH = "/tmp/unix_socket_example"
# 若檔案存在就先刪除
if os.path.exists(SOCKET_PATH):
os.remove(SOCKET_PATH)
# 建立 socket
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# 綁定 socket 到檔案
server.bind(SOCKET_PATH)
server.listen(1)
print("🔌 Server waiting for connection...")
# 等待 client 連線
conn, _ = server.accept()
print("✅ Client connected.")
while True:
data = conn.recv(1024)
if not data:
break
print("📥 Received:", data.decode())
conn.sendall(b"Echo: " + data)
conn.close()
server.close()
os.remove(SOCKET_PATH)

@ -0,0 +1,94 @@
import socket
import os
# # socket file path
# SOCKET_PATH = "/tmp/unix_socket_example"
# # 若檔案存在就先刪除
# if os.path.exists(SOCKET_PATH):
# os.remove(SOCKET_PATH)
# # 建立 socket
# server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# # 綁定 socket 到檔案
# server.bind(SOCKET_PATH)
# server.listen(1)
# print("🔌 Server waiting for connection...")
# # 等待 client 連線
# conn, _ = server.accept()
# print("✅ Client connected.")
# while True:
# data = conn.recv(1024)
# if not data:
# break
# print("📥 Received:", data.decode())
# conn.sendall(b"Echo: " + data)
# conn.close()
# server.close()
# os.remove(SOCKET_PATH)
# =====================
# from pymavlink import mavutil
# def create_unix_socket_connection():
# # 建立一個 mavtcpin 實例
# mav_conn = mavutil.mavtcpin("127.0.0.250:9999", source_system=1, source_component=1)
# # 替換底層 socket
# # 關閉原有的 socket
# mav_conn.listen.close()
# # 創建 Unix socket 並替換
# unix_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# unix_socket_path = "/tmp/unix_socket_mavlink.sock"
# # 確保先前的 unix socket 已被移除
# if os.path.exists(unix_socket_path):
# os.remove(unix_socket_path)
# # 綁定並設定 Unix socket
# unix_socket.bind(unix_socket_path)
# unix_socket.listen(1)
# unix_socket.setblocking(0)
# mavutil.set_close_on_exec(unix_socket.fileno())
# # 替換 listen socket
# mav_conn.listen = unix_socket
# mav_conn.fd = unix_socket.fileno()
# # mav_conn.port = unix_socket
# return mav_conn
# a = create_unix_socket_connection()
# # print(a.port)
# ===============================
import mavunixin
mav_conn = mavunixin.mavunixin("unix:/tmp/unix_socket_mavlink.sock", source_system=1, source_component=1)
import time
print("🔌 Server waiting for connection...")
while True:
time.sleep(1)
mavlinkMsg = mav_conn.recv_msg()
if mavlinkMsg is not None:
print("📥 Received:", mavlinkMsg)
# a.send(mavlinkMsg)
# print("📤 Server replied:", data.decode())
else:
print("No message received.")
Loading…
Cancel
Save