From e4f0b2dc938127d733c6183d08307ad4c251307a Mon Sep 17 00:00:00 2001 From: wenchun Date: Mon, 19 May 2025 12:21:06 +0800 Subject: [PATCH] =?UTF-8?q?Matlab=E8=88=87Gazebo=E7=9A=84=E9=96=89?= =?UTF-8?q?=E7=92=B0=E6=A8=A1=E6=93=AC=E6=89=80=E9=9C=80=E7=A8=8B=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/unitdev03/0519/close_loop_multi_1.py | 297 ++++++++++ src/unitdev03/0519/enemy_status.py | 340 ++++++++++++ src/unitdev03/0519/test_transform.py | 663 +++++++++++++++++++++++ 3 files changed, 1300 insertions(+) create mode 100644 src/unitdev03/0519/close_loop_multi_1.py create mode 100644 src/unitdev03/0519/enemy_status.py create mode 100644 src/unitdev03/0519/test_transform.py diff --git a/src/unitdev03/0519/close_loop_multi_1.py b/src/unitdev03/0519/close_loop_multi_1.py new file mode 100644 index 0000000..5542372 --- /dev/null +++ b/src/unitdev03/0519/close_loop_multi_1.py @@ -0,0 +1,297 @@ +#!/usr/bin/env python3 +from pymavlink import mavutil +import socket +import re +import time +import math +import threading +import json +import os +import datetime +import queue +import argparse +from select import select + +class Drone: + def __init__(self, instance_id, mavlink_port, udp_port): + self.instance_id = instance_id + self.mavlink_port = mavlink_port + self.udp_port = udp_port + self.running = True + self.connection = None + + # 創建數據緩存 + self.latest_position = None + + # 創建日誌佇列 + self.log_queue = queue.Queue() + + # 創建日誌文件目錄 + self.log_dir = "drone_logs" + if not os.path.exists(self.log_dir): + os.makedirs(self.log_dir) + # 創建一個含時間戳的日誌文件名 + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + self.log_file = os.path.join(self.log_dir, f"drone_{self.instance_id}_{timestamp}.log") + + # 通訊統計 + self.packet_counter = 0 + self.start_time = time.time() + self.packets_per_second = 0 + + def connect(self): + """僅建立MAVLink連接""" + print(f"Drone {self.instance_id}: 建立MAVLink連接 (Port {self.mavlink_port})...") + connection_string = f"udp:127.0.0.1:{self.mavlink_port}" + self.connection = mavutil.mavlink_connection(connection_string) + self.connection.wait_heartbeat() + print(f"Drone {self.instance_id}: MAVLink連接已建立") + + def set_guided_mode(self): + """切換到GUIDED模式""" + self.connection.mav.set_mode_send( + self.connection.target_system, + mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED, + 4 # GUIDED模式ID + ) + print(f"Drone {self.instance_id}: 已切換到GUIDED模式") + + def arm(self): + """解鎖無人機""" + self.connection.mav.command_long_send( + self.connection.target_system, + self.connection.target_component, + mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, + 0, 1, 0, 0, 0, 0, 0, 0 # 1表示解鎖 + ) + print(f"Drone {self.instance_id}: 無人機已解鎖") + + def takeoff(self, altitude=3.0): + """起飛到指定高度""" + self.connection.mav.command_long_send( + self.connection.target_system, + self.connection.target_component, + mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, + 0, 0, 0, 0, 0, 0, 0, altitude + ) + print(f"Drone {self.instance_id}: 起飛命令已發送,目標高度{altitude}公尺") + + def mavlink_listener(self): + """事件驅動方式監聽MAVLink訊息""" + # 獲取底層socket + mavlink_socket = self.connection.port.fileno() + + while self.running: + # 使用select等待數據,無須sleep + readable, _, _ = select([mavlink_socket], [], [], 0.001) + if readable: + msg = self.connection.recv_msg() + if msg and msg.get_type() == 'LOCAL_POSITION_NED': + self.latest_position = msg + + def log_worker(self): + """事件驅動方式處理日誌寫入""" + while self.running: + if not self.log_queue.empty(): + try: + # 批量處理日誌 + batch = [] + while not self.log_queue.empty() and len(batch) < 100: + try: + entry = self.log_queue.get_nowait() + batch.append(entry) + except queue.Empty: + break + + # 寫入日誌 + if batch: + with open(self.log_file, 'a', encoding='utf-8') as f: + for entry in batch: + f.write(json.dumps(entry, ensure_ascii=False) + "\n") + except Exception as e: + print(f"Drone {self.instance_id}: 日誌寫入錯誤: {e}") + + # 每秒計算一次統計數據 + now = time.time() + if now - self.start_time >= 1.0: + self.packets_per_second = self.packet_counter + print(f"Drone {self.instance_id} - 每秒封包數: {self.packets_per_second}") + self.packet_counter = 0 + self.start_time = now + + # 使用select等待短暫時間而非sleep + select([], [], [], 0.01) # 10ms的微小等待 + + def log_packet(self, packet_data, packet_num): + """將數據加入日誌佇列""" + try: + timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + log_entry = { + "timestamp": timestamp, + "packet_number": packet_num, + "data": packet_data + } + self.log_queue.put(log_entry) + except Exception as e: + print(f"Drone {self.instance_id}: 佇列添加錯誤: {e}") + + def run(self): + """使用事件驅動方式處理UDP通訊""" + # 啟動背景線程 + mavlink_thread = threading.Thread(target=self.mavlink_listener) + mavlink_thread.daemon = True + mavlink_thread.start() + + log_thread = threading.Thread(target=self.log_worker) + log_thread.daemon = True + log_thread.start() + + # 設置UDP服務器 + server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 262144) # 增大接收緩衝區 + server_socket.bind(('0.0.0.0', self.udp_port)) + server_socket.setblocking(0) # 非阻塞模式 + + print(f"Drone {self.instance_id}: UDP服務器已啟動,監聽端口{self.udp_port}") + print(f"Drone {self.instance_id}: 日誌將保存至 {self.log_file}") + + # 主事件循環 + while self.running: + # 使用select監聽UDP + readable, _, _ = select([server_socket], [], [], 0.001) + + if readable: + try: + data, _ = server_socket.recvfrom(4096) + decoded_data = data.decode('utf-8') + + # 增加計數器 + self.packet_counter += 1 + total_packets = self.packet_counter + + try: + # 解析JSON + json_data = json.loads(decoded_data) + + # 寫入日誌 + self.log_packet(json_data, total_packets) + + # 提取速度數據 + vel_match = re.search(r'"next_velocity":\[([-\d\.\,]+)\]', decoded_data) + if vel_match: + velocity_str = vel_match.group(1) + velocity = [float(x) for x in velocity_str.split(',')] + vx, vy, vz = velocity + + # 設置type_mask - 僅使用速度控制 + type_mask = ( + mavutil.mavlink.POSITION_TARGET_TYPEMASK_X_IGNORE | + mavutil.mavlink.POSITION_TARGET_TYPEMASK_Y_IGNORE | + mavutil.mavlink.POSITION_TARGET_TYPEMASK_Z_IGNORE | + mavutil.mavlink.POSITION_TARGET_TYPEMASK_AX_IGNORE | + mavutil.mavlink.POSITION_TARGET_TYPEMASK_AY_IGNORE | + mavutil.mavlink.POSITION_TARGET_TYPEMASK_AZ_IGNORE | + mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_IGNORE | + mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_RATE_IGNORE + ) + + # 直接發送速度指令,不需等待位置數據 + self.connection.mav.set_position_target_local_ned_send( + 0, + self.connection.target_system, + self.connection.target_component, + mavutil.mavlink.MAV_FRAME_LOCAL_NED, + type_mask, + 0, 0, 0, # 位置 + vx, vy, -vz, # 速度 + 0, 0, 0, # 加速度 + 0, 0 # 航向 + ) + print(f"Drone {self.instance_id}: 發送速度指令: vx={vx}, vy={vy}, vz={-vz}") + + except json.JSONDecodeError as je: + print(f"Drone {self.instance_id}: JSON解析錯誤: {je}") + self.log_packet({"raw_data": decoded_data}, total_packets) + + except Exception as e: + print(f"Drone {self.instance_id}: 處理錯誤: {e}") + + # 清理資源 + server_socket.close() + print(f"Drone {self.instance_id}: 線程結束") + +def main(): + # 解析命令行參數 + parser = argparse.ArgumentParser(description='多無人機控制程序') + parser.add_argument('--drones', type=str, default='1,2,3', + help='要控制的無人機ID,用逗號分隔 (例如: 1,2,3)') + args = parser.parse_args() + + # 所有可用的無人機配置 + all_drone_configs = [ + (1, 14550, 5003), + (2, 14560, 5013), + (3, 14570, 5023) + ] + + # 解析要使用的無人機ID + drone_ids = [int(id_str) for id_str in args.drones.split(',')] + drone_configs = [config for config in all_drone_configs if config[0] in drone_ids] + + drones = [] + threads = [] + + try: + # 創建所有無人機實例 + for instance_id, mavlink_port, udp_port in drone_configs: + drone = Drone(instance_id, mavlink_port, udp_port) + drones.append(drone) + + # 步驟1: 所有無人機連接 + for drone in drones: + drone.connect() + + # 適當延遲確保連接穩定 + time.sleep(1) + + # 步驟2: 所有無人機同時切換到GUIDED模式 + for drone in drones: + drone.set_guided_mode() + + time.sleep(1) + + # 步驟3: 所有無人機同時解鎖 + for drone in drones: + drone.arm() + + time.sleep(1) + + # 步驟4: 所有無人機同時起飛 + for drone in drones: + drone.takeoff() + + # 啟動監聽線程 + for drone in drones: + thread = threading.Thread(target=drone.run) + thread.daemon = True + threads.append(thread) + thread.start() + + print(f"已啟動 {len(drones)} 台無人機 (ID: {args.drones}),按Ctrl+C結束程序") + + # 主線程事件循環 + while True: + select([], [], [], 0.1) # 使用select代替time.sleep + + except KeyboardInterrupt: + print("正在停止所有無人機...") + for drone in drones: + drone.running = False + + for thread in threads: + thread.join(timeout=2) + + print("程序已結束") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/unitdev03/0519/enemy_status.py b/src/unitdev03/0519/enemy_status.py new file mode 100644 index 0000000..c4b96a6 --- /dev/null +++ b/src/unitdev03/0519/enemy_status.py @@ -0,0 +1,340 @@ +#!/usr/bin/env python3 + +from pymavlink import mavutil +import time +import sys +import json +import socket +import threading +import os +from datetime import datetime + +# 固定初始位置設定 (5,5,0) +INITIAL_POSITION = [5, 5, 0] + +# MAVLink 連接設定 +connection_string = 'udp:127.0.0.1:14553' + +# 網路傳輸設定 +target_ip = '140.120.31.130' +target_port = 9053 + +# 建立 UDP socket 用於傳送數據 +sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + +# 封包編號計數器 +packet_counter = 0 + +# 共享變數 - 存儲最新位置 +current_position = None +initial_position = INITIAL_POSITION.copy() +position_lock = threading.Lock() + +# 連接到無人機 +print(f"嘗試連接到 MAVLink: {connection_string}") +try: + connection = mavutil.mavlink_connection(connection_string, autoreconnect=True) + connection.wait_heartbeat() + print(f"成功連接到系統 {connection.target_system}!") +except Exception as e: + print(f"MAVLink 連接失敗: {e}") + sys.exit(1) + +print(f"使用指定初始位置: 北={initial_position[0]}, 東={initial_position[1]}, 下={initial_position[2]}") + +# 請求位置數據流 +connection.mav.request_data_stream_send( + connection.target_system, + connection.target_component, + mavutil.mavlink.MAV_DATA_STREAM_POSITION, + 10, + 1 +) + +# 日誌記錄函數 +def start_logging(): + """創建並開啟日誌文件""" + log_dir = "logs" + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + log_file = os.path.join(log_dir, f"drone_log_{timestamp}.csv") + + file = open(log_file, "w") + file.write("packet_id,timestamp,x,y,z,vx,vy,vz\n") # CSV標頭 + print(f"日誌記錄已開始: {log_file}") + return file + +# 控制函數 +def set_guided_mode(): + """設置為 GUIDED 模式""" + connection.mav.command_long_send( + connection.target_system, + connection.target_component, + mavutil.mavlink.MAV_CMD_DO_SET_MODE, + 0, + mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED, + 4, + 0, 0, 0, 0, 0 + ) + ack = connection.recv_match(type='COMMAND_ACK', blocking=True, timeout=3) + return ack and ack.result == mavutil.mavlink.MAV_RESULT_ACCEPTED + +def arm_throttle(): + """解鎖油門""" + connection.mav.command_long_send( + connection.target_system, + connection.target_component, + mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, + 0, 1, 0, 0, 0, 0, 0, 0 + ) + ack = connection.recv_match(type='COMMAND_ACK', blocking=True, timeout=3) + return ack and ack.result == mavutil.mavlink.MAV_RESULT_ACCEPTED + +def takeoff(altitude): + """起飛到指定高度""" + connection.mav.command_long_send( + connection.target_system, + connection.target_component, + mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, + 0, 0, 0, 0, 0, 0, 0, altitude + ) + ack = connection.recv_match(type='COMMAND_ACK', blocking=True, timeout=3) + return ack and ack.result == mavutil.mavlink.MAV_RESULT_ACCEPTED + +def set_speed_limit(speed_type, speed): + """ + 設置速度限制 + + Parameters: + ----------- + speed_type : int + 速度類型 (0=空速, 1=地速, 2=爬升速度, 3=下降速度) + speed : float + 速度值 (米/秒) + """ + connection.mav.command_long_send( + connection.target_system, + connection.target_component, + mavutil.mavlink.MAV_CMD_DO_CHANGE_SPEED, + 0, # 確認 + speed_type, # 參數1: 速度類型 + speed, # 參數2: 速度 (m/s) + -1, # 參數3: 油門 (-1 表示不變) + 0, 0, 0, 0 # 參數4-7: 未使用 + ) + ack = connection.recv_match(type='COMMAND_ACK', blocking=True, timeout=3) + success = ack and ack.result == mavutil.mavlink.MAV_RESULT_ACCEPTED + if success: + print(f"已設置速度限制為 {speed:.2f} m/s") + else: + print("設置速度限制失敗") + return success + +def goto_position(north, east, down): + """前往指定位置 (世界座標系)""" + # 將世界座標轉換為相對座標 + rel_north = north - initial_position[0] + rel_east = east - initial_position[1] + rel_down = down - initial_position[2] + + connection.mav.set_position_target_local_ned_send( + 0, + connection.target_system, + connection.target_component, + mavutil.mavlink.MAV_FRAME_LOCAL_NED, + 0b0000111111111000, + rel_north, rel_east, rel_down, + 0, 0, 0, + 0, 0, 0, + 0, 0 + ) + # print(f"相對位置: 北={rel_north:.2f}, 東={rel_east:.2f}, 下={rel_down:.2f}") + +def get_current_position(): + """獲取存儲的當前位置""" + with position_lock: + if current_position is None: + return None + return current_position.copy() + +def control_interface(): + """命令行控制界面""" + print("無人機控制介面已啟動,輸入命令:") + print(" arm - 解鎖油門") + print(" takeoff [高度] - 起飛") + print(" goto [北] [東] [高度] - 前往位置 (世界座標系)") + print(" pos - 顯示當前位置 (相對座標和世界座標)") + print(" init - 顯示初始位置") + print(" exit - 退出控制") + + while True: + try: + cmd = input("命令> ").strip().split() + if not cmd: + continue + + if cmd[0] == "arm": + print("嘗試解鎖油門...") + if set_guided_mode() and arm_throttle(): + print("成功解鎖油門!") + else: + print("解鎖失敗") + + elif cmd[0] == "takeoff": + alt = float(cmd[1]) if len(cmd) > 1 else 3.0 + print(f"嘗試起飛到 {alt} 米高...") + if takeoff(alt): + print("起飛命令已發送") + else: + print("起飛失敗") + + elif cmd[0] == "goto": + if len(cmd) >= 4: + north = float(cmd[1]) + east = float(cmd[2]) + down = -float(cmd[3]) # 高度是負的下 + print(f"前往世界位置: 北:{north} 東:{east} 下:{-down} 米") + goto_position(north, east, down) + else: + print("用法: goto [北] [東] [高度]") + + elif cmd[0] == "pos": + pos = get_current_position() + if pos: + rel_x, rel_y, rel_z = pos + # 計算世界座標 + world_x = initial_position[0] + rel_x + world_y = initial_position[1] + rel_y + world_z = initial_position[2] + rel_z + # NED 轉 ENU + enu_x = rel_y + enu_y = rel_x + enu_z = -rel_z + + print("\n當前位置:") + #print(f" 相對座標 (NED): 北={rel_x:.2f}m, 東={rel_y:.2f}m, 下={rel_z:.2f}m") + print(f" 世界座標 : 北={world_x:.2f}m, 東={world_y:.2f}m, 下={-world_z:.2f}m") + #print(f" 相對座標 (ENU): 東={enu_x:.2f}m, 北={enu_y:.2f}m, 上={enu_z:.2f}m") + #print(f" 高度: {-rel_z:.2f}m") + else: + print("尚未獲取到位置信息") + + elif cmd[0] == "speed": + if len(cmd) > 1: + speed = float(cmd[1]) + set_speed_limit(1, speed) # 1 = 地速 + else: + print("用法: speed [速度m/s]") + + elif cmd[0] == "climbspeed": + if len(cmd) > 1: + speed = float(cmd[1]) + set_speed_limit(2, speed) # 2 = 爬升速度 + set_speed_limit(3, speed) # 3 = 下降速度 + else: + print("用法: climbspeed [速度m/s]") + + elif cmd[0] == "init": + print(f"初始位置 (NED): 北={initial_position[0]:.2f}m, 東={initial_position[1]:.2f}m, 下={initial_position[2]:.2f}m") + + elif cmd[0] == "exit": + print("退出控制介面") + break + + else: + print(f"未知命令: {cmd[0]}") + except Exception as e: + print(f"命令執行錯誤: {e}") + +# 數據傳輸線程函數 +def data_sending_thread(): + global packet_counter, current_position + + # 開啟日誌文件 + log_file = start_logging() + + try: + print(f"開始接收數據並傳送到 {target_ip}:{target_port}...") + while True: + msg = connection.recv_match(type='LOCAL_POSITION_NED', blocking=True, timeout=1) + if msg is not None: + # 更新當前位置(線程安全) + with position_lock: + current_position = [msg.x, msg.y, msg.z] + + # 增加封包編號 + packet_counter += 1 + + # 計算世界座標 + world_x = initial_position[0] + msg.x + world_y = initial_position[1] + msg.y + world_z = initial_position[2] + msg.z + + # NED 轉換為 ENU 並使用世界座標 + enu_position = { + "x": world_x, # 東 (從 NED 的 x) + "y": world_y, # 北 (從 NED 的 y) + "z": -world_z # 上 (從 NED 的 -z) + } + + enu_velocity = { + "vx": msg.vy, # 東向速度 + "vy": msg.vx, # 北向速度 + "vz": -msg.vz # 上向速度 + } + + # 創建JSON對象 (使用世界座標) + json_data = { + "packet_id": packet_counter, + "timestamp": msg.time_boot_ms, + "enemy_position": enu_position, + "enemy_velocity": enu_velocity + } + + # 寫入日誌文件 (CSV格式) + try: + log_file.write(f"{packet_counter},{msg.time_boot_ms},{enu_position['x']},{enu_position['y']},{enu_position['z']},{enu_velocity['vx']},{enu_velocity['vy']},{enu_velocity['vz']}\n") + log_file.flush() # 確保數據即時寫入文件 + except Exception as e: + print(f"寫入日誌失敗: {e}") + + json_str = json.dumps(json_data) + + try: + sock.sendto(json_str.encode('utf-8'), (target_ip, target_port)) + except Exception as e: + print(f"傳送數據失敗: {e}") + + time.sleep(0.1) + except Exception as e: + print(f"數據傳輸線程錯誤: {e}") + finally: + # 確保關閉日誌文件 + log_file.close() + print("日誌文件已關閉") + +# 創建並啟動數據發送線程 +sender_thread = threading.Thread(target=data_sending_thread) +sender_thread.daemon = True +sender_thread.start() + +# 啟動命令行控制界面 +try: + control_interface() +except KeyboardInterrupt: + print("\n程序已被用戶中斷") +finally: + if connection: + connection.mav.request_data_stream_send( + connection.target_system, + connection.target_component, + mavutil.mavlink.MAV_DATA_STREAM_ALL, + 0, 0 + ) + print("數據流已停止") + sock.close() + print("網絡連接已關閉") + print(f"總共傳送了 {packet_counter} 個封包") + print("日誌文件已儲存") \ No newline at end of file diff --git a/src/unitdev03/0519/test_transform.py b/src/unitdev03/0519/test_transform.py new file mode 100644 index 0000000..96b22be --- /dev/null +++ b/src/unitdev03/0519/test_transform.py @@ -0,0 +1,663 @@ +import rclpy +from pymavlink import mavutil +from time import sleep +import time + +import socket +import struct + +# used at fdm_switch_example_two +import threading +import queue + +import json + +def mavlink_analyzer_simple(count = 500): + # rclpy.init() + # node = rclpy.create_node('mavlink_analyzer_simple') + + print('Inintializing connection') + + connection_string="udp:127.0.0.1:14550" + connection = mavutil.mavlink_connection(connection_string) + + print('Catch messages') + msg_count = {} + + start_time = time.time() + + for _ in range(count): + msg = connection.recv_msg() + # msg = connection.recv_match(blocking=True) + if msg: + msg_type = msg.get_type() + if msg_type in msg_count: + msg_count[msg_type] += 1 + else: + msg_count[msg_type] = 1 + + if msg.get_type() == 'SERVO_OUTPUT_RAW': + print(msg) + pass + + else: + print('No message yet, 1 second for data to fill') + sleep(1) + + print('Messages Type received:') + print(msg_count) + + end_time = time.time() + print('Time elapsed: ', end_time - start_time) + + print('Closing connection') + connection.close() + + +def fdm_parser_example(data=None): + if data is None: + data = bytes.fromhex('1a48e903b1340500e803e803e803e803000000000000000000000000000000000000000000000000') + # 1a48e903b1340500e803e803e803e803000000000000000000000000000000000000000000000000 + + + parse_format = 'HHI16H' + + if len(data) != struct.calcsize(parse_format): + print("got packet of len %u, expected %u" % (len(data), struct.calcsize(parse_format))) + exit(1) + + decoded = struct.unpack(parse_format,data) + + magic = decoded[0] + frame_rate_hz = decoded[1] + frame_count = decoded[2] + pwm = decoded[3:] + + + print("magic: %u, frame_rate_hz: %u, frame_count: %u, pwm: %s" % (magic, frame_rate_hz, frame_count, pwm)) + +def fdm_switch_example_one(): + + ''' + 這個範例在 SITL 與 Gazebo 之間 做一個簡單的UDP轉發器 + 沒有轉換數據格式 + ''' + # make a link get fdm from SITL, as a server + sock_s2g = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock_s2g.bind(('', 9012)) + sock_s2g.settimeout(0.1) + + # set a target link pass JSON to Gazebo, as a client + sock_g2s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + server_address = ('127.0.0.1', 19012) + + packet_count = 0 + start_time = time.time() + + while True: + try: + # 接到SITL的數據 並轉發給Gazebo + data_s2g, addr_s2g = sock_s2g.recvfrom(1024) + sock_g2s.sendto(data_s2g, server_address) + # 得到Gazebo的回應 並轉發給SITL + data_g2s, addr_g2s = sock_g2s.recvfrom(1024) + sock_s2g.sendto(data_g2s, addr_s2g) + + # 解析並保存接收到的數據到 JSON 檔案 (SITL -> Gazebo) + try: + # 解析從 SITL 到 Gazebo 的數據 + decoded = struct.unpack('HHI16H', data_s2g) + s2g_json = { + "magic": decoded[0], + "frame_rate_hz": decoded[1], + "frame_count": decoded[2], + "pwm": list(decoded[3:]) + } + + # 解析從 Gazebo 到 SITL 的數據 + # 注意:假設從 Gazebo 返回的數據是字串形式的 JSON + try: + g2s_data_str = data_g2s.decode('utf-8') + # 保存原始數據,適合後續分析 + with open('gazebo_response_raw.txt', 'a') as f: + f.write(g2s_data_str + '\n') + except: + # 如果不是 UTF-8 格式,可能是二進制數據 + g2s_data_str = str(data_g2s) + + # 合併數據到一個 JSON 對象 + combined_data = { + "sitl_to_gazebo": s2g_json, + "gazebo_to_sitl": g2s_data_str, + "timestamp": time.time() + } + + # 寫入 JSON 檔案 + with open('fdm_one_data.json', 'w') as f: + json.dump(combined_data, f, indent=2) + + except Exception as e: + print(f"JSON 處理錯誤: {e}") + + packet_count += 1 + except socket.timeout: + print(f'no value') + #print(f'address:{addr_s2g}') + #pass + + current_time = time.time() + elapsed_time = current_time - start_time + + if elapsed_time >= 1.0: + #print(f"Packets received in the last second: {packet_count}") + print(f'JSON Pack:{data_g2s}') + packet_count = 0 + start_time = current_time + fdm_parser_example(data_s2g) + + +def fdm_switch_example_two(): + ''' + 這個範例在 SITL 與 Gazebo 之間 做一個簡單的UDP轉發器 + 有轉換數據格式 + + time_usec + servo1_raw + servo16_raw + ''' + # read info from SITL via MAVLink + connection_string="udp:127.0.0.1:14550" # uart mavlink # MATLAB + connection = mavutil.mavlink_connection(connection_string) + + # set a target link pass JSON to Gazebo, as a client + sock_g2s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + server_address = ('127.0.0.1', 19012) + + data_queue = queue.Queue() + servo_out = [0] * 16 + data_queue.put(servo_out) + + def send_data_from_queue(sock, server_address, q, frame_rate_fdm=600, frame_count_fdm=1): + interval = 1.0 / frame_rate_fdm + while True: + start_time = time.time() + try: + data = q.get(timeout=0.1) + if data is None: + break + last_data = data + except queue.Empty: + data = last_data + data = struct.pack('HHI16H', 0x481a, frame_rate_fdm, frame_count_fdm, *servo_out) + sock.sendto(data, server_address) + + # 新增:寫入 JSON 檔案 + data_json = { + "magic": 0x481a, + "frame_rate_hz": frame_rate_fdm, + "frame_count": frame_count_fdm, + "pwm": list(servo_out) + } + with open('fdm_data.json', 'w') as f: + json.dump(data_json, f) + + frame_count_fdm += 1 + fdm_parser_example(data) # debug + + elapsed_time = time.time() - start_time + sleep_time = interval - elapsed_time + if sleep_time > 0: + sleep(sleep_time) + + Running = True + thread = threading.Thread(target=send_data_from_queue, args=(sock_g2s, server_address, data_queue)) + thread.start() + + while Running: + msgb1 = None + msg = connection.recv_msg() + while msg : + if msg.get_type() == 'SERVO_OUTPUT_RAW': + msgb1 = msg + # break # 這裡不需要break,因為我要最後一個 servo_output_raw 的值 + msg = connection.recv_msg() + + if msgb1: + for i in range(16): + servo_out[i] = getattr(msgb1, f'servo{i+1}_raw') + data_queue.put(servo_out) + msgb1 = None + # Running = False # debug + else: + # print('No message yet, 10 ms for data to fill') + sleep(0.01) + + # Example of putting data into the queue + # data_queue.put(data) + # To stop the thread, you can put None into the queue + # data_queue.put(None) + + +def fdm_switch_example_one_with_remote_forwarding(): + ''' + 這個範例在 SITL 與 Gazebo 之間 做一個簡單的UDP轉發器 + 並且將JSON數據轉發到指定的遠端IP + ''' + # make a link get fdm from SITL, as a server + sock_s2g = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock_s2g.bind(('', 9012)) + sock_s2g.settimeout(0.1) + + # set a target link pass JSON to Gazebo, as a client + sock_g2s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + server_address = ('127.0.0.1', 19012) + + # 建立與遠端IP的socket連接 + remote_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + remote_address = ('140.120.31.130', 9003) # 遠端IP與埠號,您可以根據需要更改埠號 + print(f'Setting up forwarding to remote address: {remote_address}') + + packet_count = 0 + start_time = time.time() + + while True: + try: + # 接到SITL的數據 並轉發給Gazebo + data_s2g, addr_s2g = sock_s2g.recvfrom(1024) + sock_g2s.sendto(data_s2g, server_address) + + # 得到Gazebo的回應 並轉發給SITL + data_g2s, addr_g2s = sock_g2s.recvfrom(1024) + sock_s2g.sendto(data_g2s, addr_s2g) + + # 解析並保存接收到的數據到 JSON 檔案 (SITL -> Gazebo) + try: + # 解析從 SITL 到 Gazebo 的數據 + decoded = struct.unpack('HHI16H', data_s2g) + s2g_json = { + "magic": decoded[0], + "frame_rate_hz": decoded[1], + "frame_count": decoded[2], + "pwm": list(decoded[3:]) + } + + # 解析從 Gazebo 到 SITL 的數據 + try: + g2s_data_str = data_g2s.decode('utf-8') + # 保存原始數據,適合後續分析 + with open('gazebo_response_raw.txt', 'a') as f: + f.write(g2s_data_str + '\n') + except: + # 如果不是 UTF-8 格式,可能是二進制數據 + g2s_data_str = str(data_g2s) + + # 合併數據到一個 JSON 對象 + combined_data = { + "sitl_to_gazebo": s2g_json, + "gazebo_to_sitl": g2s_data_str, + "timestamp": time.time() + } + + # 寫入 JSON 檔案 + with open('fdm_one_data.json', 'w') as f: + json.dump(combined_data, f, indent=2) + + # 將JSON數據轉發到遠端IP + remote_socket.sendto(json.dumps(combined_data).encode(), remote_address) + + except Exception as e: + print(f"JSON 處理錯誤: {e}") + + packet_count += 1 + except socket.timeout: + print(f'no value') + #pass + + current_time = time.time() + elapsed_time = current_time - start_time + + if elapsed_time >= 1.0: + print(f'JSON Pack:{data_g2s}') + print(f'Forwarded data to remote IP: 140.120.31.130') + packet_count = 0 + start_time = current_time + fdm_parser_example(data_s2g) + + +def fdm_switch_multiple(num_systems=3): + ''' + Create multiple UDP forwarders between SITL and Gazebo using threading + Each forwarder uses its own set of ports following the pattern: + - SITL ports: 9002, 9012, 9022, ... + - Gazebo ports: 19002, 19012, 19022, ... + + Parameters: + num_systems (int): Number of forwarding pairs to create + ''' + # Flag to control thread execution + running = True + + def forwarder_thread(system_id): + sitl_port = 9002 + (system_id * 10) + gazebo_port = 19002 + (system_id * 10) + + # Create SITL socket (server) + sitl_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sitl_socket.bind(('', sitl_port)) + sitl_socket.settimeout(0.1) + + # Create Gazebo socket (client) + gazebo_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + gazebo_socket.settimeout(0.1) + gazebo_address = ('127.0.0.1', gazebo_port) + + print(f"System {system_id}: Forwarding {sitl_port} → {gazebo_port}") + + packet_count = 0 + start_time = time.time() + last_data_s2g = None + + try: + while running: + try: + # Receive from SITL and forward to Gazebo + data_s2g, addr_s2g = sitl_socket.recvfrom(1024) + last_data_s2g = data_s2g + gazebo_socket.sendto(data_s2g, gazebo_address) + + try: + # Get response from Gazebo and send back to SITL + data_g2s, addr_g2s = gazebo_socket.recvfrom(1024) + sitl_socket.sendto(data_g2s, addr_s2g) + + # Process and store data + try: + # Parse SITL to Gazebo data + decoded = struct.unpack('HHI16H', data_s2g) + s2g_json = { + "magic": decoded[0], + "frame_rate_hz": decoded[1], + "frame_count": decoded[2], + "pwm": list(decoded[3:]) + } + + # Try to parse Gazebo to SITL data + try: + g2s_data_str = data_g2s.decode('utf-8') + if packet_count % 50 == 0: + with open(f'gazebo_response_raw_{system_id}.txt', 'a') as f: + f.write(g2s_data_str + '\n') + except: + g2s_data_str = str(data_g2s) + + # Combine data + combined_data = { + "sitl_to_gazebo": s2g_json, + "gazebo_to_sitl": g2s_data_str, + "timestamp": time.time(), + "system_id": system_id, + "sitl_port": sitl_port, + "gazebo_port": gazebo_port + } + + # Save to JSON file (once per second) + current_time = time.time() + elapsed_time = current_time - start_time + if elapsed_time >= 1.0: + with open(f'fdm_data_system_{system_id}.json', 'w') as f: + json.dump(combined_data, f, indent=2) + + except Exception as e: + print(f"System {system_id} - JSON processing error: {e}") + + packet_count += 1 + + except socket.timeout: + # No response from Gazebo, continue + pass + + except socket.timeout: + # No data from SITL, continue + pass + + # Print stats periodically + current_time = time.time() + elapsed_time = current_time - start_time + + if elapsed_time >= 1.0: + # print(f"System {system_id} - Packets: {packet_count} in the last second") + if packet_count > 0 and last_data_s2g: + print(f'System {system_id} - JSON Pack: {data_g2s}') + fdm_parser_example(last_data_s2g) + packet_count = 0 + start_time = current_time + + except Exception as e: + print(f"System {system_id} - Error in forwarder thread: {e}") + finally: + # Clean up resources + sitl_socket.close() + gazebo_socket.close() + print(f"System {system_id} - Forwarder thread terminated") + + # Create and start threads for each system + threads = [] + for i in range(num_systems): + thread = threading.Thread(target=forwarder_thread, args=(i,)) + thread.daemon = True + threads.append(thread) + thread.start() + + # Keep the main thread running + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nShutting down all forwarders...") + running = False + time.sleep(1) + print("Done.") + return + + +def fdm_switch_multiple_with_remote(num_systems=3, remote_ip='140.120.31.130'): + ''' + Create multiple UDP forwarders between SITL and Gazebo using threading + Also forwards the combined data to a remote IP address + + Each forwarder uses its own set of ports following the pattern: + - SITL ports: 9002, 9012, 9022, ... + - Gazebo ports: 19002, 19012, 19022, ... + - Remote ports: 9003, 9013, 9023, ... (for the remote forwarding) + + Parameters: + num_systems (int): Number of forwarding pairs to create + remote_ip (str): IP address for remote forwarding + ''' + # Flag to control thread execution + running = True + + # Add shared counters for total packets + total_packets_sent = 0 + total_packets_lock = threading.Lock() # For thread safety + + def forwarder_thread(system_id): + nonlocal total_packets_sent + sitl_port = 9002 + (system_id * 10) + gazebo_port = 19002 + (system_id * 10) + remote_port = 9003 + (system_id * 10) + + # Create SITL socket (server) + sitl_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sitl_socket.bind(('', sitl_port)) + sitl_socket.settimeout(0.1) + + # Create Gazebo socket (client) + gazebo_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + gazebo_socket.settimeout(0.1) + gazebo_address = ('127.0.0.1', gazebo_port) + + # Create remote forwarding socket + remote_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + remote_address = (remote_ip, remote_port) + + print(f"System {system_id}: SITL:{sitl_port} → Gazebo:{gazebo_port}, Remote:{remote_ip}:{remote_port}") + + packet_count = 0 + start_time = time.time() + last_data_s2g = None + forward_counter = 0 # 添加轉發計數器 + system_packets_sent = 0 # Local counter for this system + + try: + while running: + try: + # Receive from SITL and forward to Gazebo + data_s2g, addr_s2g = sitl_socket.recvfrom(1024) + last_data_s2g = data_s2g + gazebo_socket.sendto(data_s2g, gazebo_address) + + try: + # Get response from Gazebo and send back to SITL + data_g2s, addr_g2s = gazebo_socket.recvfrom(1024) + sitl_socket.sendto(data_g2s, addr_s2g) + + # Process and store data + try: + # Parse SITL to Gazebo data + decoded = struct.unpack('HHI16H', data_s2g) + s2g_json = { + "magic": decoded[0], + "frame_rate_hz": decoded[1], + "frame_count": decoded[2], + "pwm": list(decoded[3:]) + } + + # Try to parse Gazebo to SITL data + try: + g2s_data_str = data_g2s.decode('utf-8') + if packet_count % 50 == 0: + with open(f'gazebo_response_raw_{system_id}.txt', 'a') as f: + f.write(g2s_data_str + '\n') + except: + g2s_data_str = str(data_g2s) + + # Combine data + combined_data = { + "sitl_to_gazebo": s2g_json, + "gazebo_to_sitl": g2s_data_str, + "timestamp": time.time(), + "system_id": system_id, + "sitl_port": sitl_port, + "gazebo_port": gazebo_port + } + + ''' + # 每n筆才轉發一次 + forward_counter += 1 + if forward_counter >= 18: + # Forward data to remote server + remote_socket.sendto(json.dumps(combined_data).encode(), remote_address) + #print(f"System {system_id} - Send data to remote (第{packet_count}個包)") + forward_counter = 0 # 重置計數器 + ''' + + # 使用模運算代替計數器重置 + forward_counter += 1 + if packet_count % 1 == 0: + # Forward data to remote server + remote_socket.sendto(json.dumps(combined_data).encode(), remote_address) + system_packets_sent += 1 + with total_packets_lock: + total_packets_sent += 1 + # Debug用,看每秒傳了多少筆出去 + #print(f"System {system_id} - Send data to remote (第{packet_count}個包)") + + # Save to JSON file (once per second) + current_time = time.time() + elapsed_time = current_time - start_time + if elapsed_time >= 1.0: + with open(f'fdm_data_system_{system_id}.json', 'w') as f: + json.dump(combined_data, f, indent=2) + + except Exception as e: + print(f"System {system_id} - JSON processing error: {e}") + + packet_count += 1 + + except socket.timeout: + # No response from Gazebo, continue + pass + + except socket.timeout: + # No data from SITL, continue + pass + + # Print stats periodically + current_time = time.time() + elapsed_time = current_time - start_time + + if elapsed_time >= 1.0: + print(f"System {system_id} - Packets: {packet_count} in the last second") + if packet_count > 0 and last_data_s2g: + fdm_parser_example(last_data_s2g) + packet_count = 0 + start_time = current_time + + except Exception as e: + print(f"System {system_id} - Error in forwarder thread: {e}") + finally: + # Clean up resources + sitl_socket.close() + gazebo_socket.close() + remote_socket.close() + print(f"System {system_id} - Forwarder thread terminated") + + # Create statistics display thread + def stats_display_thread(): + start_time = time.time() + while running: + time.sleep(5) # Display stats every 5 seconds + elapsed = time.time() - start_time + with total_packets_lock: + rate = total_packets_sent / elapsed if elapsed > 0 else 0 + print(f"\n===== STATISTICS =====") + print(f"Total packets sent: {total_packets_sent}") + print(f"Average rate: {rate:.2f} packets/sec") + print(f"Running time: {elapsed:.1f} seconds") + print(f"======================\n") + + # Create and start threads for each system + threads = [] + for i in range(num_systems): + thread = threading.Thread(target=forwarder_thread, args=(i,)) + thread.daemon = True + threads.append(thread) + thread.start() + + # Start stats display thread + stats_thread = threading.Thread(target=stats_display_thread) + stats_thread.daemon = True + stats_thread.start() + + # Keep the main thread running + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nShutting down all forwarders...") + running = False + time.sleep(1) + print(f"Final total packets sent: {total_packets_sent}") + print("Done.") + return + + +print('Start') +#fdm_switch_example_two() +#mavlink_analyzer_simple() +#fdm_switch_example_one() +#fdm_switch_example_one_with_remote_forwarding() +#fdm_switch_multiple(1) # Create 3 forwarding pairs +fdm_switch_multiple_with_remote(3) # Create 1 forwarding pair with remote forwarding +#fdm_parser_example() \ No newline at end of file