From 7ce094d211dda9550ee72b98f3fb6f0c66ec0573 Mon Sep 17 00:00:00 2001 From: Chiyu Chen Date: Mon, 20 Apr 2026 15:02:47 +0800 Subject: [PATCH] =?UTF-8?q?(temp)=20=E6=95=B4=E7=90=86=E6=AA=94=E6=A1=88?= =?UTF-8?q?=E8=88=87=E8=A8=BB=E8=A7=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 - .../fc_network_adapter/serialManager.py | 18 +- .../unitdev02/{test01.md => devnote2.md} | 0 .../unitdev02/{test01.py => fdm_switch.py} | 0 src/unitdev02/unitdev02/lifecycle_node.py | 45 - src/unitdev02/unitdev02/ping_test.py | 188 ++ src/unitdev02/unitdev02/pymav_nullbuffer.py | 46 + src/unitdev02/unitdev02/pymavlik_util.py | 2710 +++++++++++++++++ src/unitdev02/unitdev02/serial_udp_ad.py | 71 + src/unitdev02/unitdev02/sslChech.sh | 27 - src/unitdev02/unitdev02/udp_unix_ad.py | 40 + 11 files changed, 3063 insertions(+), 84 deletions(-) rename src/unitdev02/unitdev02/{test01.md => devnote2.md} (100%) rename src/unitdev02/unitdev02/{test01.py => fdm_switch.py} (100%) delete mode 100644 src/unitdev02/unitdev02/lifecycle_node.py create mode 100644 src/unitdev02/unitdev02/ping_test.py create mode 100644 src/unitdev02/unitdev02/pymav_nullbuffer.py create mode 100644 src/unitdev02/unitdev02/pymavlik_util.py create mode 100644 src/unitdev02/unitdev02/serial_udp_ad.py delete mode 100644 src/unitdev02/unitdev02/sslChech.sh create mode 100644 src/unitdev02/unitdev02/udp_unix_ad.py diff --git a/README.md b/README.md index 7c2689a..d20e46d 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,5 @@ 這是天雷系統的專案 - - === ## 功能簡介 1. mavlink 多對多支援平台 diff --git a/src/fc_network_adapter/fc_network_adapter/serialManager.py b/src/fc_network_adapter/fc_network_adapter/serialManager.py index 8638432..2d1c715 100644 --- a/src/fc_network_adapter/fc_network_adapter/serialManager.py +++ b/src/fc_network_adapter/fc_network_adapter/serialManager.py @@ -38,7 +38,7 @@ class SerialMode(Enum): NOT_USE = auto() # 不使用 -# ====================== Frame Processor 基類與實現 ===================== +# ====================== Frame Processor Base and Implementation ===================== class FrameProcessor(ABC): """協議處理器基類""" @@ -123,11 +123,9 @@ class XBeeFrameProcessor(FrameProcessor): def process_outgoing(self, data: bytes) -> bytes: """將數據封裝為 XBee API 傳輸幀""" - return XBeeFrameProcessor.encapsulate_data(data) + return XBeeFrameHandler.encapsulate_data(data) -# ====================== XBee Frame Handler ===================== - class XBeeFrameHandler: """XBee API Frame 處理器""" @@ -200,6 +198,10 @@ class XBeeFrameHandler: return data[rf_data_start:3 + length] + + +# ====================== Dongle Command Handler ===================== + class ATCommandHandler: """AT 指令回應處理器""" @@ -240,9 +242,7 @@ class ATCommandHandler: """處理 SL (Serial Number Low)""" pass - -# ====================== Serial Handler ===================== - +# ================ Serial UDP Socket Object ============== class SerialHandler(asyncio.Protocol): """asyncio.Protocol 用於處理 Serial 收發""" @@ -291,8 +291,6 @@ class SerialHandler(asyncio.Protocol): ) -# ====================== UDP Handler ===================== - class UDPHandler(asyncio.DatagramProtocol): """asyncio.DatagramProtocol 用於處理 UDP 收發""" @@ -600,7 +598,7 @@ if __name__ == '__main__': SERIAL_PORT = '/dev/ttyUSB0' # 手動指定 SERIAL_BAUDRATE = 115200 - UDP_REMOTE_PORT = 14571 + UDP_REMOTE_PORT = 14561 sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT) # SERIAL_PORT = '/dev/ttyACM0' # 手動指定 diff --git a/src/unitdev02/unitdev02/test01.md b/src/unitdev02/unitdev02/devnote2.md similarity index 100% rename from src/unitdev02/unitdev02/test01.md rename to src/unitdev02/unitdev02/devnote2.md diff --git a/src/unitdev02/unitdev02/test01.py b/src/unitdev02/unitdev02/fdm_switch.py similarity index 100% rename from src/unitdev02/unitdev02/test01.py rename to src/unitdev02/unitdev02/fdm_switch.py diff --git a/src/unitdev02/unitdev02/lifecycle_node.py b/src/unitdev02/unitdev02/lifecycle_node.py deleted file mode 100644 index bebc82e..0000000 --- a/src/unitdev02/unitdev02/lifecycle_node.py +++ /dev/null @@ -1,45 +0,0 @@ - - - -import rclpy -from rclpy.node import Node -from std_msgs.msg import String -import time - -# import mavros_msgs.srv - -class TalkerNode(Node): - def __init__(self): - start_time = time.time() - super().__init__('talker_node') - end_time = time.time() - print(f"Node initialization took {end_time - start_time:.2f} seconds") - - self.publisher_ = self.create_publisher(String, 'hahatest/_1', 10) - self.timer = self.create_timer(1.0, self.timer_callback) # 每秒執行一次 - self.get_logger().info('TalkerNode has been started.') - - def timer_callback(self): - msg = String() - msg.data = 'Hello, ROS 2!' - self.publisher_.publish(msg) - self.get_logger().info(f'Published: "{msg.data}"') - -def main(args=None): - rclpy.init(args=args) - node = TalkerNode() - - print("Before sleep") - time.sleep(5) # 等待 5 秒鐘 - print("After sleep") - try: - start_time = time.time() - while time.time() - start_time < 10: # 持續 10 秒鐘 - rclpy.spin_once(node) - time.sleep(1) # 每秒執行一次 - finally: - node.destroy_node() - rclpy.shutdown() - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/src/unitdev02/unitdev02/ping_test.py b/src/unitdev02/unitdev02/ping_test.py new file mode 100644 index 0000000..d17da0d --- /dev/null +++ b/src/unitdev02/unitdev02/ping_test.py @@ -0,0 +1,188 @@ +import time +from pymavlink import mavutil + +# # 強制 source_system=255 +# master = mavutil.mavlink_connection('udpout:localhost:14561', source_system=255) + +# print("Starting Force-Ping Test...") + +# while True: +# # 1. 發送 Heartbeat (證明連線活著) +# master.mav.heartbeat_send(mavutil.mavlink.MAV_TYPE_GCS, +# mavutil.mavlink.MAV_AUTOPILOT_INVALID, 0, 0, 0) + +# # 2. 強制發送 PING (target 1, 1) +# # 我們加一個計數器,方便在 tcpdump 裡辨識 +# ping_seq = int(time.time()) % 100 +# master.mav.ping_send(int(time.time() * 1e6), ping_seq, 1, 1) + +# print(f"Sent Heartbeat and PING (seq {ping_seq})") + +# # 3. 非阻塞讀取,避免卡死 +# msg = master.recv_match(blocking=False) +# if msg: +# print(f"--> Received something: {msg.get_type()}") + +# time.sleep(1) # 每秒噴一次 + +# ------------------------------------------------------------------- + +# # 改成 udpin,這會讓你的程式守在 14561 等模擬器送資料進來 +# # 這樣你就能收到模擬器的 Heartbeat,建立連線後再發 PING +# master = mavutil.mavlink_connection('udpin:localhost:14561', source_system=255) + +# print("Waiting for simulator heartbeat...") +# # 這一行很重要:先收到 Heartbeat,pymavlink 才會設定好 target 地址 +# master.wait_heartbeat() +# print(f"Heartbeat from system {master.target_system} component {master.target_component}") + +# while True: +# # 既然收到了,現在發 PING 就會順著原路回去 +# master.mav.ping_send(int(time.time() * 1e6), 1, master.target_system, master.target_component) +# print("PING sent!") + +# msg = master.recv_match(type='PING', blocking=True, timeout=2) +# if msg: +# print(f"Got PING response: {msg}") +# time.sleep(1) + + +# ------------------------------------------------------------------- + +# import time +# from pymavlink import mavutil + +# # 核心修改:因為 MAVProxy 正在往 14561 噴資料,我們必須「接住」它 +# # 使用 udpin 會讓你的程式在 14561 監聽 +# master = mavutil.mavlink_connection('udpin:localhost:14561', source_system=254) + +# print("正在等待 MAVProxy 轉發的 Heartbeat...") + +# # 只要這行跑過,代表你跟 MAVProxy 握手成功了 +# master.wait_heartbeat() +# print(f"成功連線到系統 {master.target_system}!") + +# while True: +# # 1. 務必發送 Heartbeat,讓 MAVProxy 知道要把回應送回這個 Port +# master.mav.heartbeat_send( +# mavutil.mavlink.MAV_TYPE_GCS, +# mavutil.mavlink.MAV_AUTOPILOT_INVALID, +# 0, 0, 0 +# ) + +# # 2. 修改 target_component 為 1 (飛控核心) +# # 注意:這裡刻意將 seq 設為動態,方便你在 watch PING 裡觀察 +# p_seq = int(time.time()) % 255 +# master.mav.ping_send( +# int(time.time() * 1e6), +# p_seq, +# 1, +# 1 # <--- 改成 1 試試看! +# ) +# print(f"Sent PING seq {p_seq} to 1/1") + +# # 3. 讀取所有訊息,看看有沒有 PING 回來 +# # 有時候回應的 target_system 可能是 255 (你的 ID) +# msg = master.recv_match(type='PING', blocking=True, timeout=1.0) +# if msg: +# print(f"🎉 成功!收到回應: {msg}") + +# time.sleep(1) + + + +# ------------------------------------------------------------------- + + +# import time +# from pymavlink import mavutil + +# # 建議一樣用 udpin 監聽 MAVProxy 的輸出 +# master = mavutil.mavlink_connection('udpin:localhost:14561', source_system=255) + +# def get_time_ns(): +# return int(time.time() * 1e9) # 轉為奈秒 + +# print("開始測試 TIMESYNC 響應...") + +# while True: +# now_ns = get_time_ns() + +# # 發送 TIMESYNC 請求 +# # tc1=0 代表這是請求,ts1=當前時間 +# master.mav.timesync_send(0, now_ns) + +# # 等待回應 +# msg = master.recv_match(type='TIMESYNC', blocking=True, timeout=1.0) + +# if msg and msg.tc1 != 0: +# # 收到回應,msg.tc1 應該等於我們剛才送出的 now_ns +# rtt_ms = (get_time_ns() - msg.tc1) / 1e6 +# print(f"🕒 TIMESYNC RTT: {rtt_ms:.3f} ms") +# break # 測試一次就好 +# else: +# print("等待 TIMESYNC 回應中...") + +# time.sleep(1) + +# ------------------------------------------------------------------- + +import time +import statistics +from pymavlink import mavutil + +# 連接到 MAVProxy 轉發的端口 +master = mavutil.mavlink_connection('udpin:localhost:14561', source_system=255) + +# 測試設定 +TARGET_SYS = 2 # 你想測試的載具 ID +SAMPLES = 20 # 取 20 次樣本來算平均和抖動 +rtt_history = [] + +print(f"開始測試對 System {TARGET_SYS} 的通道品質...") + +def get_time_ns(): + return int(time.time() * 1e9) + +try: + while len(rtt_history) < SAMPLES: + ts1_send = get_time_ns() + + # 發送 TIMESYNC (V1/V2 格式相容) + # 針對特定系統發送 + master.mav.timesync_send(0, ts1_send) + + # 等待回應 (過濾目標系統的訊息) + msg = master.recv_match(type='TIMESYNC', blocking=True, timeout=1.0) + + if msg and msg.get_srcSystem() == TARGET_SYS and msg.tc1 != 0: + ts1_recv = get_time_ns() + rtt_ns = ts1_recv - msg.tc1 + rtt_ms = rtt_ns / 1e6 + rtt_history.append(rtt_ms) + print(f"Sample {len(rtt_history)}: RTT = {rtt_ms:.3f} ms") + else: + print("Request timeout or mismatched ID...") + + time.sleep(0.1) # 快速採樣 + + # 4. 計算統計數據 + avg_latency = statistics.mean(rtt_history) + std_dev = statistics.stdev(rtt_history) # 這就是 Jitter 的一種表現 + max_rtt = max(rtt_history) + min_rtt = min(rtt_history) + + print("\n--- 物理通道品質分析報告 ---") + print(f"目標載具 : System {TARGET_SYS}") + print(f"平均延遲 (Avg) : {avg_latency:.3f} ms") + print(f"最小延遲 (Min) : {min_rtt:.3f} ms") + print(f"最大延遲 (Max) : {max_rtt:.3f} ms") + print(f"抖動幅度 (Jitter): {std_dev:.3f} ms (標準差)") + + if std_dev > 10: + print("警告:網路抖動過高,可能影響控制穩定性!") + else: + print("通道品質:良好") + +except KeyboardInterrupt: + print("測試終止。") \ No newline at end of file diff --git a/src/unitdev02/unitdev02/pymav_nullbuffer.py b/src/unitdev02/unitdev02/pymav_nullbuffer.py new file mode 100644 index 0000000..286f8c2 --- /dev/null +++ b/src/unitdev02/unitdev02/pymav_nullbuffer.py @@ -0,0 +1,46 @@ +from pymavlink.dialects.v20 import common as mavlink + +class NullBuffer: + def write(self, data): pass + def seek(self, pos): pass + def tell(self): return 0 + +class CapturingBuffer: + def __init__(self): + self.last_data = b'' + def write(self, data): + self.last_data = bytes(data) + def seek(self, pos): pass + def tell(self): return 0 + +# 1. 初始化(僅作為編碼器) +# file 參數可以是任何擁有 write() 方法的物件,這裡用 BytesIO 模擬 +# 初始化方法一:使用 BytesIO +# import io +# out_buf = io.BytesIO() +# mav = mavlink.MAVLink(out_buf, srcSystem=1, srcComponent=191) + +# 初始化方法二:使用自定義的 NullBuffer +mav = mavlink.MAVLink(NullBuffer(), srcSystem=1, srcComponent=191) +mav.seq = 254 + +# 2. 建立心跳包並取得二進制數據 +# 連續產出「不同」的訊息物件 +for i in range(3): + msg = mav.heartbeat_encode(mavlink.MAV_TYPE_GCS, 0, 0, 0, 0) + data = msg.pack(mav) + mav.seq = (mav.seq + 1) & 0xFF + # MAVLink 2 的序列號在第 3 個 Byte (Index 2) + print(f"第 {i} 次發送, Seq: {data[4]}, Raw: {data.hex()}") + + + +print("分隔線") + +buf = CapturingBuffer() +mav_buf2 = mavlink.MAVLink(buf, srcSystem=1, srcComponent=191) + +for i in range(3): + mav_buf2.heartbeat_send(mavlink.MAV_TYPE_GCS, 0, 0, 0, 0) + data = buf.last_data + print(f"第 {i} 次發送, Seq: {data[4]}, Raw: {data.hex()}") \ No newline at end of file diff --git a/src/unitdev02/unitdev02/pymavlik_util.py b/src/unitdev02/unitdev02/pymavlik_util.py new file mode 100644 index 0000000..67e5d99 --- /dev/null +++ b/src/unitdev02/unitdev02/pymavlik_util.py @@ -0,0 +1,2710 @@ +#!/usr/bin/env python3 +''' +mavlink python utility functions + +Copyright Andrew Tridgell 2011-2019 +Released under GNU LGPL version 3 or later +''' +from __future__ import print_function +from builtins import object + +import socket, math, struct, time, os, fnmatch, array, sys, errno +import select +import copy +import json +import re +import platform +from pymavlink import mavexpression + +# We want to re-export x25crc here +from pymavlink.generator.mavcrc import x25crc as x25crc + +# adding these extra imports allows pymavlink to be used directly with pyinstaller +# without having complex spec files. To allow for installs that don't have ardupilotmega +# at all we avoid throwing an exception if it isn't installed +try: + from pymavlink.dialects.v10 import ardupilotmega +except Exception: + pass + +# maximum packet length for a single receive call - use the UDP limit +UDP_MAX_PACKET_LEN = 65535 + +# Store the MAVLink library for the currently-selected dialect +# (set by set_dialect()) +mavlink = None + +# Store the mavlink file currently being operated on +# (set by mavlink_connection()) +mavfile_global = None + +# If the caller hasn't specified a particular native/legacy version, use this +default_native = False + +# link_id used for signing +global_link_id = 0 + +# Use a globally-set MAVLink dialect if one has been specified as an environment variable. +if not 'MAVLINK_DIALECT' in os.environ: + os.environ['MAVLINK_DIALECT'] = 'ardupilotmega' + +def mavlink10(): + '''return True if using MAVLink 1.0 or later''' + return not 'MAVLINK09' in os.environ + +def mavlink20(): + '''return True if using MAVLink 2.0''' + return 'MAVLINK20' in os.environ + +def evaluate_expression(expression, vars, nocondition=False): + '''evaluation an expression''' + return mavexpression.evaluate_expression(expression, vars, nocondition) + +def evaluate_condition(condition, vars): + '''evaluation a conditional (boolean) statement''' + if condition is None: + return True + v = evaluate_expression(condition, vars) + if v is None: + return False + return v + +def u_ord(c): + return c + +class location(object): + '''represent a GPS coordinate''' + def __init__(self, lat, lng, alt=0, heading=0): + self.lat = lat # in degrees + self.lng = lng # in degrees + self.alt = alt # in metres + self.heading = heading + + def __str__(self): + return "lat=%.6f,lon=%.6f,alt=%.1f" % (self.lat, self.lng, self.alt) + +def add_message(messages, mtype, msg): + '''add a msg to array of messages, taking account of instance messages''' + if msg._instance_field is None or getattr(msg, msg._instance_field, None) is None: + # simple case, no instance field + messages[mtype] = msg + return + instance_value = getattr(msg, msg._instance_field) + if not mtype in messages: + messages[mtype] = copy.copy(msg) + messages[mtype]._instances = {} + messages[mtype]._instances[instance_value] = msg + messages["%s[%s]" % (mtype, str(instance_value))] = copy.copy(msg) + return + messages[mtype]._instances[instance_value] = msg + prev_instances = messages[mtype]._instances + messages[mtype] = copy.copy(msg) + messages[mtype]._instances = prev_instances + messages["%s[%s]" % (mtype, str(instance_value))] = copy.copy(msg) + +def set_dialect(dialect, with_type_annotations=None): + '''set the MAVLink dialect to work with. + For example, set_dialect("ardupilotmega") + ''' + global mavlink, current_dialect + from .generator import mavparse + + if with_type_annotations is not None: + print("with_type_annotations ignored; remove parameter") + + if 'MAVLINK20' in os.environ: + wire_protocol = mavparse.PROTOCOL_2_0 + modname = "pymavlink.dialects.v20." + dialect + elif mavlink is None or mavlink.WIRE_PROTOCOL_VERSION == "1.0" or not 'MAVLINK09' in os.environ: + wire_protocol = mavparse.PROTOCOL_1_0 + modname = "pymavlink.dialects.v10." + dialect + else: + wire_protocol = mavparse.PROTOCOL_0_9 + modname = "pymavlink.dialects.v09." + dialect + + try: + mod = __import__(modname) + except Exception: + # auto-generate the dialect module + from .generator.mavgen import mavgen_python_dialect + mavgen_python_dialect(dialect, wire_protocol) + mod = __import__(modname) + components = modname.split('.') + for comp in components[1:]: + mod = getattr(mod, comp) + current_dialect = dialect + mavlink = mod + +# Set the default dialect. This is done here as it needs to be after the function declaration +set_dialect(os.environ['MAVLINK_DIALECT']) + +class mavfile_state(object): + '''state for a particular system id''' + def __init__(self): + self.messages = { 'MAV' : self } + self.flightmode = "UNKNOWN" + self.vehicle_type = "UNKNOWN" + self.mav_type = mavlink.MAV_TYPE_FIXED_WING + self.mav_autopilot = mavlink.MAV_AUTOPILOT_GENERIC + self.base_mode = 0 + self.armed = False # canonical arm state for the vehicle as a whole + + if float(mavlink.WIRE_PROTOCOL_VERSION) >= 1: + try: + self.messages['HOME'] = mavlink.MAVLink_gps_raw_int_message(0,0,0,0,0,0,0,0,0,0) + except AttributeError: + # may be using a minimal dialect + pass + try: + mavlink.MAVLink_waypoint_message = mavlink.MAVLink_mission_item_message + except AttributeError: + # may be using a minimal dialect + pass + else: + try: + self.messages['HOME'] = mavlink.MAVLink_gps_raw_message(0,0,0,0,0,0,0,0,0) + except AttributeError: + # may be using a minimal dialect + pass + +class param_state(object): + '''state for a particular system id/component id pair''' + def __init__(self): + self.params = {} + +class mavfile(object): + '''a generic mavlink port''' + def __init__(self, fd, address, source_system=255, source_component=0, notimestamps=False, input=True, use_native=default_native): + global mavfile_global + if input: + mavfile_global = self + self.fd = fd + self.sysid = 0 + self.param_sysid = (0,0) + self.address = address + self.timestamp = 0 + self.last_seq = {} + self.mav_loss = 0 + self.mav_count = 0 + self.param_fetch_start = 0 + + # state for each sysid + self.sysid_state = {} + self.sysid_state[self.sysid] = mavfile_state() + + # param state for each sysid/compid tuple + self.param_state = {} + self.param_state[self.param_sysid] = param_state() + + # status of param fetch, indexed by sysid,compid tuple + self.source_system = source_system + self.source_component = source_component + self.first_byte = True + self.robust_parsing = True + self.mav = mavlink.MAVLink(self, srcSystem=self.source_system, srcComponent=self.source_component, use_native=use_native) + self.mav.robust_parsing = self.robust_parsing + self.logfile = None + self.logfile_raw = None + self.start_time = time.time() + self.message_hooks = [] + self.idle_hooks = [] + self.uptime = 0.0 + self.notimestamps = notimestamps + self._timestamp = None + self.WIRE_PROTOCOL_VERSION = mavlink.WIRE_PROTOCOL_VERSION + self.stop_on_EOF = False + self.portdead = False + + @property + def target_system(self): + return self.sysid + + @property + def target_component(self): + return self.param_sysid[1] + + @target_system.setter + def target_system(self, value): + self.sysid = value + if not self.sysid in self.sysid_state: + self.sysid_state[self.sysid] = mavfile_state() + if self.sysid != self.param_sysid[0]: + self.param_sysid = (self.sysid, self.param_sysid[1]) + if not self.param_sysid in self.param_state: + self.param_state[self.param_sysid] = param_state() + + @target_component.setter + def target_component(self, value): + if value != self.param_sysid[1]: + self.param_sysid = (self.param_sysid[0], value) + if not self.param_sysid in self.param_state: + self.param_state[self.param_sysid] = param_state() + + @property + def params(self): + if self.param_sysid[1] == 0: + eff_tuple = (self.sysid, 1) + if eff_tuple in self.param_state: + return getattr(self.param_state[eff_tuple],'params') + return getattr(self.param_state[self.param_sysid],'params') + + @property + def messages(self): + return getattr(self.sysid_state[self.sysid],'messages') + + @property + def flightmode(self): + return getattr(self.sysid_state[self.sysid],'flightmode') + + @flightmode.setter + def flightmode(self, value): + setattr(self.sysid_state[self.sysid],'flightmode',value) + + @property + def vehicle_type(self): + return getattr(self.sysid_state[self.sysid],'vehicle_type') + + @vehicle_type.setter + def vehicle_type(self, value): + setattr(self.sysid_state[self.sysid],'vehicle_type',value) + + @property + def mav_type(self): + return getattr(self.sysid_state[self.sysid],'mav_type') + + @mav_type.setter + def mav_type(self, value): + setattr(self.sysid_state[self.sysid],'mav_type',value) + + @property + def base_mode(self): + return getattr(self.sysid_state[self.sysid],'base_mode') + + @base_mode.setter + def base_mode(self, value): + setattr(self.sysid_state[self.sysid],'base_mode',value) + + def auto_mavlink_version(self, buf): + '''auto-switch mavlink protocol version''' + if len(buf) == 0: + return + try: + magic = ord(buf[0]) + except: + magic = buf[0] + if not magic in [ 85, 254, 253 ]: + return + self.first_byte = False + if self.WIRE_PROTOCOL_VERSION == "0.9" and magic == 254: + self.WIRE_PROTOCOL_VERSION = "1.0" + set_dialect(current_dialect) + elif self.WIRE_PROTOCOL_VERSION == "1.0" and magic == 85: + self.WIRE_PROTOCOL_VERSION = "0.9" + os.environ['MAVLINK09'] = '1' + set_dialect(current_dialect) + elif self.WIRE_PROTOCOL_VERSION != "2.0" and magic == 253: + self.WIRE_PROTOCOL_VERSION = "2.0" + os.environ['MAVLINK20'] = '1' + set_dialect(current_dialect) + else: + return + # switch protocol + (callback, callback_args, callback_kwargs) = (self.mav.callback, + self.mav.callback_args, + self.mav.callback_kwargs) + self.mav = mavlink.MAVLink(self, srcSystem=self.source_system, srcComponent=self.source_component) + self.mav.robust_parsing = self.robust_parsing + self.WIRE_PROTOCOL_VERSION = mavlink.WIRE_PROTOCOL_VERSION + (self.mav.callback, self.mav.callback_args, self.mav.callback_kwargs) = (callback, + callback_args, + callback_kwargs) + + def recv(self, n=None): + '''default recv method''' + raise RuntimeError('no recv() method supplied') + + def close(self, n=None): + '''default close method''' + raise RuntimeError('no close() method supplied') + + def write(self, buf): + '''default write method''' + raise RuntimeError('no write() method supplied') + + + def select(self, timeout): + '''wait for up to timeout seconds for more data''' + if self.fd is None: + time.sleep(min(timeout,0.5)) + return True + try: + (rin, win, xin) = select.select([self.fd], [], [], timeout) + except select.error: + return False + return len(rin) == 1 + + def pre_message(self): + '''default pre message call''' + return + + def set_rtscts(self, enable): + '''enable/disable RTS/CTS if applicable''' + return + + def probably_vehicle_heartbeat(self, msg): + if msg.get_srcComponent() == mavlink.MAV_COMP_ID_GIMBAL: + return False + if msg.type in (mavlink.MAV_TYPE_GCS, + mavlink.MAV_TYPE_GIMBAL, + mavlink.MAV_TYPE_ADSB, + mavlink.MAV_TYPE_ONBOARD_CONTROLLER): + return False + if msg.autopilot in frozenset([ + mavlink.MAV_AUTOPILOT_INVALID + ]): + return False + return True + + def post_message(self, msg): + '''default post message call''' + if '_posted' in msg.__dict__: + return + msg._posted = True + msg._timestamp = time.time() + type = msg.get_type() + + if 'usec' in msg.__dict__: + self.uptime = msg.usec * 1.0e-6 + if 'time_boot_ms' in msg.__dict__: + self.uptime = msg.time_boot_ms * 1.0e-3 + + if self._timestamp is not None: + if self.notimestamps: + msg._timestamp = self.uptime + else: + msg._timestamp = self._timestamp + + src_system = msg.get_srcSystem() + src_component = msg.get_srcComponent() + src_tuple = (src_system, src_component) + + radio_tuple = (ord('3'), ord('D')) + + if not src_system in self.sysid_state: + # we've seen a new system + self.sysid_state[src_system] = mavfile_state() + + add_message(self.sysid_state[src_system].messages, type, msg) + + if src_tuple == radio_tuple: + # as a special case radio msgs are added for all sysids + for s in self.sysid_state.keys(): + self.sysid_state[s].messages[type] = msg + + if not (src_tuple == radio_tuple or msg.get_msgId() < 0): + # Don't use unknown messages to calculate number of lost packets + if not src_tuple in self.last_seq: + last_seq = -1 + else: + last_seq = self.last_seq[src_tuple] + seq = (last_seq+1) % 256 + seq2 = msg.get_seq() + if seq != seq2 and last_seq != -1: + diff = (seq2 - seq) % 256 + self.mav_loss += diff + #print("lost %u seq=%u seq2=%u last_seq=%u src_tupe=%s %s" % (diff, seq, seq2, last_seq, str(src_tuple), msg.get_type())) + self.last_seq[src_tuple] = seq2 + self.mav_count += 1 + + self.timestamp = msg._timestamp + if type == 'HEARTBEAT' and self.probably_vehicle_heartbeat(msg): + if self.sysid == 0: + # lock onto id tuple of first vehicle heartbeat + self.sysid = src_system + if float(mavlink.WIRE_PROTOCOL_VERSION) >= 1: + self.sysid_state[src_system].flightmode = mode_string_v10(msg) + self.sysid_state[src_system].armed = (msg.base_mode & mavlink.MAV_MODE_FLAG_SAFETY_ARMED) + self.sysid_state[src_system].mav_type = msg.type + self.sysid_state[src_system].mav_autopilot = msg.autopilot + elif type == 'HIGH_LATENCY2': + if self.sysid == 0: + # lock onto id tuple of first vehicle heartbeat + self.sysid = src_system + if msg.autopilot == mavlink.MAV_AUTOPILOT_ARDUPILOTMEGA: + self.sysid_state[src_system].base_mode = msg.custom0 + self.sysid_state[src_system].armed = (msg.custom0 & mavlink.MAV_MODE_FLAG_SAFETY_ARMED) + self.sysid_state[src_system].flightmode = mode_string_v10(msg) + self.sysid_state[src_system].mav_type = msg.type + self.sysid_state[src_system].mav_autopilot = msg.autopilot + + elif type == 'PARAM_VALUE': + if not src_tuple in self.param_state: + self.param_state[src_tuple] = param_state() + self.param_state[src_tuple].params[msg.param_id] = msg.param_value + elif type == 'SYS_STATUS' and mavlink.WIRE_PROTOCOL_VERSION == '0.9': + self.sysid_state[src_system].flightmode = mode_string_v09(msg) + elif type == 'GPS_RAW': + if self.sysid_state[src_system].messages['HOME'].fix_type < 2: + self.sysid_state[src_system].messages['HOME'] = msg + elif type == 'GPS_RAW_INT': + if self.sysid_state[src_system].messages['HOME'].fix_type < 3: + self.sysid_state[src_system].messages['HOME'] = msg + for hook in self.message_hooks: + hook(self, msg) + + if (msg.get_signed() and + self.mav.signing.link_id == 0 and + msg.get_link_id() != 0 and + self.target_system == msg.get_srcSystem() and + self.target_component == msg.get_srcComponent()): + # change to link_id from incoming packet + self.mav.signing.link_id = msg.get_link_id() + + + def packet_loss(self): + '''packet loss as a percentage''' + if self.mav_count == 0: + return 0 + return (100.0*self.mav_loss)/(self.mav_count+self.mav_loss) + + + def recv_msg(self): + '''message receive routine''' + self.pre_message() + while True: + n = self.mav.bytes_needed() + s = self.recv(n) + numnew = len(s) + + if numnew != 0: + if self.logfile_raw: + self.logfile_raw.write(s) + if self.first_byte: + self.auto_mavlink_version(s) + + # We always call parse_char even if the new string is empty, because the existing message buf might already have some valid packet + # we can extract + msg = self.mav.parse_char(s) + if msg: + if self.logfile and msg.get_type() != 'BAD_DATA' : + usec = int(time.time() * 1.0e6) & ~3 + self.logfile.write(struct.pack('>Q', usec) + msg.get_msgbuf()) + self.post_message(msg) + return msg + else: + # if we failed to parse any messages _and_ no new bytes arrived, return immediately so the client has the option to + # timeout + if numnew == 0: + return None + + def recv_match(self, condition=None, type=None, blocking=False, timeout=None): + '''recv the next MAVLink message that matches the given condition + type can be a string or a list of strings''' + if type is not None and not isinstance(type, list) and not isinstance(type, set): + type = [type] + start_time = time.time() + while True: + if timeout is not None: + now = time.time() + if now < start_time: + start_time = now # If an external process rolls back system time, we should not spin forever. + if start_time + timeout < time.time(): + return None + m = self.recv_msg() + if m is None: + if blocking: + for hook in self.idle_hooks: + hook(self) + if timeout is None: + self.select(0.05) + else: + self.select(timeout/2) + continue + return None + if type is not None and not m.get_type() in type: + continue + if not evaluate_condition(condition, self.messages): + continue + return m + + def check_condition(self, condition): + '''check if a condition is true''' + return evaluate_condition(condition, self.messages) + + def mavlink10(self): + '''return True if using MAVLink 1.0 or later''' + return float(self.WIRE_PROTOCOL_VERSION) >= 1 + + def mavlink20(self): + '''return True if using MAVLink 2.0 or later''' + return float(self.WIRE_PROTOCOL_VERSION) >= 2 + + def setup_logfile(self, logfile, mode='wb'): + '''start logging to the given logfile, with timestamps''' + self.logfile = open(logfile, mode=mode) + + def setup_logfile_raw(self, logfile, mode='wb'): + '''start logging raw bytes to the given logfile, without timestamps''' + self.logfile_raw = open(logfile, mode=mode) + + def wait_heartbeat(self, blocking=True, timeout=None): + '''wait for a heartbeat so we know the target system IDs''' + return self.recv_match(type='HEARTBEAT', blocking=blocking, timeout=timeout) + + def param_fetch_all(self): + '''initiate fetch of all parameters''' + if time.time() - self.param_fetch_start < 2.0: + # don't fetch too often + return + self.param_fetch_start = time.time() + self.mav.param_request_list_send(self.target_system, self.target_component) + + def param_fetch_one(self, name): + '''initiate fetch of one parameter''' + try: + idx = int(name) + self.mav.param_request_read_send(self.target_system, self.target_component, b"", idx) + except Exception: + if not isinstance(name, bytes): + name = bytes(name,'ascii') + self.mav.param_request_read_send(self.target_system, self.target_component, name, -1) + + def time_since(self, mtype): + '''return the time since the last message of type mtype was received''' + if not mtype in self.messages: + return time.time() - self.start_time + return time.time() - self.messages[mtype]._timestamp + + def param_set_send(self, parm_name, parm_value, parm_type=None): + '''wrapper for parameter set''' + if self.mavlink10(): + if parm_type is None: + parm_type = mavlink.MAVLINK_TYPE_FLOAT + self.mav.param_set_send(self.target_system, self.target_component, + parm_name.encode('utf8'), parm_value, parm_type) + else: + self.mav.param_set_send(self.target_system, self.target_component, + parm_name.encode('utf8'), parm_value) + + def waypoint_request_list_send(self): + '''wrapper for waypoint_request_list_send''' + if self.mavlink10(): + self.mav.mission_request_list_send(self.target_system, self.target_component) + else: + self.mav.waypoint_request_list_send(self.target_system, self.target_component) + + def waypoint_clear_all_send(self): + '''wrapper for waypoint_clear_all_send''' + if self.mavlink10(): + self.mav.mission_clear_all_send(self.target_system, self.target_component) + else: + self.mav.waypoint_clear_all_send(self.target_system, self.target_component) + + def waypoint_request_send(self, seq): + '''wrapper for waypoint_request_send''' + if self.mavlink10(): + self.mav.mission_request_send(self.target_system, self.target_component, seq) + else: + self.mav.waypoint_request_send(self.target_system, self.target_component, seq) + + def waypoint_set_current_send(self, seq): + '''wrapper for waypoint_set_current_send''' + if self.mavlink10(): + self.mav.mission_set_current_send(self.target_system, self.target_component, seq) + else: + self.mav.waypoint_set_current_send(self.target_system, self.target_component, seq) + + def waypoint_current(self): + '''return current waypoint''' + if self.mavlink10(): + m = self.recv_match(type='MISSION_CURRENT', blocking=True) + else: + m = self.recv_match(type='WAYPOINT_CURRENT', blocking=True) + return m.seq + + def waypoint_count_send(self, seq): + '''wrapper for waypoint_count_send''' + if self.mavlink10(): + self.mav.mission_count_send(self.target_system, self.target_component, seq) + else: + self.mav.waypoint_count_send(self.target_system, self.target_component, seq) + + def set_mode_flag(self, flag, enable): + ''' + Enables/ disables MAV_MODE_FLAG + @param flag The mode flag, + see MAV_MODE_FLAG enum + @param enable Enable the flag, (True/False) + ''' + if self.mavlink10(): + mode = self.base_mode + if enable: + mode = mode | flag + elif not enable: + mode = mode & ~flag + self.mav.command_long_send(self.target_system, self.target_component, + mavlink.MAV_CMD_DO_SET_MODE, 0, + mode, + 0, 0, 0, 0, 0, 0) + else: + print("Set mode flag not supported") + + def set_mode_auto(self): + '''enter auto mode''' + if self.mavlink10(): + self.mav.command_long_send(self.target_system, self.target_component, + mavlink.MAV_CMD_MISSION_START, 0, 0, 0, 0, 0, 0, 0, 0) + else: + MAV_ACTION_SET_AUTO = 13 + self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_SET_AUTO) + + def mode_mapping(self): + '''return dictionary mapping mode names to numbers, or None if unknown''' + mav_type = self.sysid_state[self.sysid].mav_type + mav_autopilot = self.sysid_state[self.sysid].mav_autopilot + if mav_autopilot == mavlink.MAV_AUTOPILOT_PX4: + return px4_map + if mav_type is None: + return None + return mode_mapping_byname(mav_type) + + def set_mode_apm(self, mode, custom_mode = 0, custom_sub_mode = 0): + '''enter arbitrary mode''' + if isinstance(mode, str): + mode_map = self.mode_mapping() + if mode_map is None or mode not in mode_map: + print("Unknown mode '%s'" % mode) + return + mode = mode_map[mode] + # set mode by integer mode number for ArduPilot + self.mav.command_long_send(self.target_system, + self.target_component, + mavlink.MAV_CMD_DO_SET_MODE, + 0, + mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED, + mode, + 0, + 0, + 0, + 0, + 0) + + def set_mode_px4(self, mode, custom_mode, custom_sub_mode): + '''enter arbitrary mode''' + if isinstance(mode, str): + mode_map = self.mode_mapping() + if mode_map is None or mode not in mode_map: + print("Unknown mode '%s'" % mode) + return + # PX4 uses two fields to define modes + mode, custom_mode, custom_sub_mode = px4_map[mode] + self.mav.command_long_send(self.target_system, self.target_component, + mavlink.MAV_CMD_DO_SET_MODE, 0, mode, custom_mode, custom_sub_mode, 0, 0, 0, 0) + + def set_mode(self, mode, custom_mode = 0, custom_sub_mode = 0): + '''set arbitrary flight mode''' + mav_autopilot = self.field('HEARTBEAT', 'autopilot', None) + if mav_autopilot == mavlink.MAV_AUTOPILOT_PX4: + self.set_mode_px4(mode, custom_mode, custom_sub_mode) + else: + self.set_mode_apm(mode) + + def set_mode_rtl(self): + '''enter RTL mode''' + if self.mavlink10(): + self.mav.command_long_send(self.target_system, self.target_component, + mavlink.MAV_CMD_NAV_RETURN_TO_LAUNCH, 0, 0, 0, 0, 0, 0, 0, 0) + else: + MAV_ACTION_RETURN = 3 + self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_RETURN) + + def set_mode_manual(self): + '''enter MANUAL mode''' + if self.mavlink10(): + self.mav.command_long_send(self.target_system, self.target_component, + mavlink.MAV_CMD_DO_SET_MODE, 0, + mavlink.MAV_MODE_MANUAL_ARMED, + 0, 0, 0, 0, 0, 0) + else: + MAV_ACTION_SET_MANUAL = 12 + self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_SET_MANUAL) + + def set_mode_fbwa(self): + '''enter FBWA mode''' + if self.mavlink10(): + self.mav.command_long_send(self.target_system, self.target_component, + mavlink.MAV_CMD_DO_SET_MODE, 0, + mavlink.MAV_MODE_STABILIZE_ARMED, + 0, 0, 0, 0, 0, 0) + else: + print("Forcing FBWA not supported") + + def set_mode_loiter(self): + '''enter LOITER mode''' + if self.mavlink10(): + self.mav.command_long_send(self.target_system, self.target_component, + mavlink.MAV_CMD_NAV_LOITER_UNLIM, 0, 0, 0, 0, 0, 0, 0, 0) + else: + MAV_ACTION_LOITER = 27 + self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_LOITER) + + def set_servo(self, channel, pwm): + '''set a servo value''' + self.mav.command_long_send(self.target_system, self.target_component, + mavlink.MAV_CMD_DO_SET_SERVO, 0, + channel, pwm, + 0, 0, 0, 0, 0) + + + def set_relay(self, relay_pin=0, state=True): + '''Set relay_pin to value of state''' + if self.mavlink10(): + self.mav.command_long_send( + self.target_system, # target_system + self.target_component, # target_component + mavlink.MAV_CMD_DO_SET_RELAY, # command + 0, # Confirmation + relay_pin, # Relay Number + int(state), # state (1 to indicate arm) + 0, # param3 (all other params meaningless) + 0, # param4 + 0, # param5 + 0, # param6 + 0) # param7 + else: + print("Setting relays not supported.") + + def calibrate_level(self): + '''calibrate accels (1D version)''' + self.mav.command_long_send(self.target_system, self.target_component, + mavlink.MAV_CMD_PREFLIGHT_CALIBRATION, 0, + 1, 1, 0, 0, 0, 0, 0) + + def calibrate_pressure(self): + '''calibrate pressure''' + if self.mavlink10(): + self.mav.command_long_send(self.target_system, self.target_component, + mavlink.MAV_CMD_PREFLIGHT_CALIBRATION, 0, + 0, 0, 1, 0, 0, 0, 0) + else: + MAV_ACTION_CALIBRATE_PRESSURE = 20 + self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_CALIBRATE_PRESSURE) + + def reboot_autopilot(self, hold_in_bootloader=False, force=False): + '''reboot the autopilot''' + if self.mavlink10(): + if hold_in_bootloader: + param1 = 3 + else: + param1 = 1 + if force: + param6 = 20190226 + else: + param6 = 0 + self.mav.command_long_send(self.target_system, self.target_component, + mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN, 0, + param1, 0, 0, 0, 0, param6, 0) + + def wait_gps_fix(self): + self.recv_match(type='VFR_HUD', blocking=True) + if self.mavlink10(): + self.recv_match(type='GPS_RAW_INT', blocking=True, + condition='GPS_RAW_INT.fix_type>=3 and GPS_RAW_INT.lat != 0') + else: + self.recv_match(type='GPS_RAW', blocking=True, + condition='GPS_RAW.fix_type>=2 and GPS_RAW.lat != 0') + + def location(self, relative_alt=False): + '''return current location''' + self.wait_gps_fix() + # wait for another VFR_HUD, to ensure we have correct altitude + self.recv_match(type='VFR_HUD', blocking=True) + self.recv_match(type='GLOBAL_POSITION_INT', blocking=True) + if relative_alt: + alt = self.messages['GLOBAL_POSITION_INT'].relative_alt*0.001 + else: + alt = self.messages['VFR_HUD'].alt + return location(self.messages['GPS_RAW_INT'].lat*1.0e-7, + self.messages['GPS_RAW_INT'].lon*1.0e-7, + alt, + self.messages['VFR_HUD'].heading) + + def arducopter_arm(self): + '''arm motors (arducopter only)''' + if self.mavlink10(): + self.mav.command_long_send( + self.target_system, # target_system + self.target_component, + mavlink.MAV_CMD_COMPONENT_ARM_DISARM, # command + 0, # confirmation + 1, # param1 (1 to indicate arm) + 0, # param2 (all other params meaningless) + 0, # param3 + 0, # param4 + 0, # param5 + 0, # param6 + 0) # param7 + + def arducopter_disarm(self): + '''disarm motors (arducopter only)''' + if self.mavlink10(): + self.mav.command_long_send( + self.target_system, # target_system + self.target_component, + mavlink.MAV_CMD_COMPONENT_ARM_DISARM, # command + 0, # confirmation + 0, # param1 (0 to indicate disarm) + 0, # param2 (all other params meaningless) + 0, # param3 + 0, # param4 + 0, # param5 + 0, # param6 + 0) # param7 + + def motors_armed(self): + '''return true if motors armed''' + return self.sysid_state[self.sysid].armed + + def motors_armed_wait(self): + '''wait for motors to be armed''' + while True: + m = self.wait_heartbeat() + if self.motors_armed(): + return + + def motors_disarmed_wait(self): + '''wait for motors to be disarmed''' + while True: + m = self.wait_heartbeat() + if not self.motors_armed(): + return + + + def field(self, type, field, default=None): + '''convenient function for returning an arbitrary MAVLink + field with a default''' + if not type in self.messages: + return default + return getattr(self.messages[type], field, default) + + def param(self, name, default=None): + '''convenient function for returning an arbitrary MAVLink + parameter with a default''' + if not name in self.params: + return default + return self.params[name] + + def setup_signing(self, secret_key, sign_outgoing=True, allow_unsigned_callback=None, initial_timestamp=None, link_id=None): + '''setup for MAVLink2 signing''' + self.mav.signing.secret_key = secret_key + self.mav.signing.sign_outgoing = sign_outgoing + self.mav.signing.allow_unsigned_callback = allow_unsigned_callback + if link_id is None: + # auto-increment the link_id for each link + global global_link_id + link_id = global_link_id + global_link_id = min(global_link_id + 1, 255) + self.mav.signing.link_id = link_id + if initial_timestamp is None: + # timestamp is time since 1/1/2015 + epoch_offset = 1420070400 + now = max(time.time(), epoch_offset) + initial_timestamp = now - epoch_offset + initial_timestamp = int(initial_timestamp * 100 * 1000) + # initial_timestamp is in 10usec units + self.mav.signing.timestamp = initial_timestamp + + def disable_signing(self): + '''disable MAVLink2 signing''' + self.mav.signing.secret_key = None + self.mav.signing.sign_outgoing = False + self.mav.signing.allow_unsigned_callback = None + self.mav.signing.link_id = 0 + self.mav.signing.timestamp = 0 + +def set_close_on_exec(fd): + '''set the close on exec flag on a file descriptor. Ignore exceptions''' + try: + import fcntl + flags = fcntl.fcntl(fd, fcntl.F_GETFD) + flags |= fcntl.FD_CLOEXEC + fcntl.fcntl(fd, fcntl.F_SETFD, flags) + except Exception: + pass + +class FakeSerial(): + def __init__(self): + pass + def read(self, len): + return "" + def write(self, buf): + raise Exception("write always fails") + def inWaiting(self): + return 0 + def close(self): + pass + +class mavserial(mavfile): + '''a serial mavlink port''' + def __init__(self, device, baud=115200, autoreconnect=False, source_system=255, source_component=0, use_native=default_native, force_connected=False): + import serial + if ',' in device and not os.path.exists(device): + device, baud = device.split(',') + self.baud = baud + self.device = device + self.autoreconnect = autoreconnect + self.force_connected = force_connected + # we rather strangely set the baudrate initially to 1200, then change to the desired + # baudrate. This works around a kernel bug on some Linux kernels where the baudrate + # is not set correctly + try: + self.port = serial.Serial(self.device, 1200, timeout=0, + dsrdtr=False, rtscts=False, xonxoff=False) + except serial.SerialException as e: + if not force_connected: + raise e + self.port = FakeSerial() + + try: + fd = self.port.fileno() + set_close_on_exec(fd) + except Exception: + fd = None + self.set_baudrate(self.baud) + mavfile.__init__(self, fd, device, source_system=source_system, source_component=source_component, use_native=use_native) + self.rtscts = False + + def set_rtscts(self, enable): + '''enable/disable RTS/CTS if applicable''' + try: + self.port.setRtsCts(enable) + except Exception: + self.port.rtscts = enable + self.rtscts = enable + + def set_baudrate(self, baudrate): + '''set baudrate''' + try: + self.port.setBaudrate(baudrate) + except Exception: + # for pySerial 3.0, which doesn't have setBaudrate() + self.port.baudrate = baudrate + + def close(self): + self.port.close() + + def recv(self,n=None): + if n is None: + n = self.mav.bytes_needed() + if self.fd is None: + waiting = self.port.inWaiting() + if waiting < n: + n = waiting + ret = self.port.read(n) + return ret + + def write(self, buf): + try: + return self.port.write(bytes(buf)) + except Exception: + if not self.portdead: + print("Device %s is dead" % self.device) + self.portdead = True + if self.autoreconnect: + self.reset() + return -1 + + def reset(self): + import serial + try: + try: + newport = serial.Serial(self.device, self.baud, timeout=0, + dsrdtr=False, rtscts=False, xonxoff=False) + except serial.SerialException as e: + if not self.force_connected: + raise e + newport = FakeSerial() + return False + self.port.close() + self.port = newport + print("Device %s reopened OK" % self.device) + self.portdead = False + try: + self.fd = self.port.fileno() + except Exception: + self.fd = None + self.set_baudrate(self.baud) + if self.rtscts: + self.set_rtscts(self.rtscts) + return True + except Exception: + return False + + +class mavudp(mavfile): + '''a UDP mavlink socket''' + def __init__(self, device, input=True, broadcast=False, source_system=255, source_component=0, use_native=default_native, timeout=0): + a = device.split(':') + if len(a) != 2: + raise ValueError("UDP ports must be specified as host:port") + self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.udp_server = input + self.broadcast = False + if input: + self.port.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.port.bind((a[0], int(a[1]))) + else: + self.destination_addr = (a[0], int(a[1])) + if broadcast: + self.port.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + self.broadcast = True + set_close_on_exec(self.port.fileno()) + self.port.setblocking(0) + self.last_address = None + self.timeout = timeout + self.clients = set() + self.clients_last_alive = {} + self.resolved_destination_addr = None + mavfile.__init__(self, self.port.fileno(), device, source_system=source_system, source_component=source_component, input=input, use_native=use_native) + + def close(self): + self.port.close() + + def recv(self,n=None): + try: + data, new_addr = self.port.recvfrom(UDP_MAX_PACKET_LEN) + except socket.error as e: + if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.ECONNREFUSED ]: + return "" + raise + if self.udp_server: + self.clients.add(new_addr) + self.clients_last_alive[new_addr] = time.time() + elif self.broadcast: + self.last_address = new_addr + return data + + def write(self, buf): + try: + if self.udp_server: + current_time = time.time() + to_remove = set() + for address in self.clients: + if len(self.clients) == 1 or self.timeout <= 0 or self.clients_last_alive[address] + self.timeout > current_time: + self.port.sendto(buf, address) + elif len(self.clients) > 1 and len(to_remove) < len(self.clients) - 1: + # we keep always at least 1 client, so we don't break old behavior + to_remove.add(address) + self.clients_last_alive.pop(address) + self.clients -= to_remove + else: + if self.last_address and self.broadcast: + self.destination_addr = self.last_address + self.broadcast = False + self.port.connect(self.destination_addr) + # turn a (possible) hostname into an IP address to + # avoid resolving the hostname for every packet sent: + if self.destination_addr[0] != self.resolved_destination_addr: + self.resolved_destination_addr = self.destination_addr[0] + self.destination_addr = (socket.gethostbyname(self.destination_addr[0]), self.destination_addr[1]) + self.port.sendto(buf, self.destination_addr) + except socket.error: + pass + + def recv_msg(self): + '''message receive routine for UDP link''' + self.pre_message() + s = self.recv() + if len(s) > 0: + if self.first_byte: + self.auto_mavlink_version(s) + + m = self.mav.parse_char(s) + if m is not None: + self.post_message(m) + + return m + +class mavmcast(mavfile): + '''a UDP multicast mavlink socket''' + def __init__(self, device, broadcast=False, source_system=255, source_component=0, use_native=default_native): + a = device.split(':') + mcast_ip = "239.255.145.50" + mcast_port = 14550 + if len(a) == 1 and len(a[0]) > 0: + mcast_port = int(a[0]) + elif len(a) > 1: + mcast_ip = a[0] + mcast_port = int(a[1]) + + # first the receiving socket. We use separate sending and receiving + # sockets so we can use the port number of the sending socket to detect + # packets from ourselves + self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + self.port.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + if platform.system() == "Windows": + # on windows we need to bind to INADDR_ANY first, then use multicast join for address + self.port.bind(("0.0.0.0", mcast_port)) + else: + self.port.bind((mcast_ip, mcast_port)) + mreq = struct.pack("4sl", socket.inet_aton(mcast_ip), socket.INADDR_ANY) + self.port.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) + self.port.setblocking(0) + set_close_on_exec(self.port.fileno()) + + # now the sending socket + self.port_out = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.port_out.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.port_out.setblocking(0) + self.port_out.connect((mcast_ip, mcast_port)) + set_close_on_exec(self.port_out.fileno()) + self.myport = None + + mavfile.__init__(self, self.port.fileno(), device, + source_system=source_system, source_component=source_component, + input=False, use_native=use_native) + + def close(self): + self.port.close() + self.port_out.close() + + def recv(self,n=None): + try: + data, new_addr = self.port.recvfrom(UDP_MAX_PACKET_LEN) + if self.myport is None: + try: + (myaddr,self.myport) = self.port_out.getsockname() + except Exception: + pass + except socket.error as e: + if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.ECONNREFUSED ]: + return "" + raise + if self.myport == new_addr[1]: + # data from ourselves, discard + return '' + return data + + def write(self, buf): + try: + self.port_out.send(buf) + except socket.error as e: + pass + + def recv_msg(self): + '''message receive routine for UDP link''' + self.pre_message() + s = self.recv() + if len(s) > 0: + if self.first_byte: + self.auto_mavlink_version(s) + + m = self.mav.parse_char(s) + if m is not None: + self.post_message(m) + + return m + + +class mavtcp(mavfile): + '''a TCP mavlink socket''' + def __init__(self, + device, + autoreconnect=False, + source_system=255, + source_component=0, + retries=6, + use_native=default_native): + a = device.split(':') + if len(a) != 2: + raise ValueError("TCP ports must be specified as host:port") + self.destination_addr = (a[0], int(a[1])) + + self.autoreconnect = autoreconnect + + self.retries = retries + self.do_connect() + + mavfile.__init__(self, self.port.fileno(), "tcp:" + device, source_system=source_system, source_component=source_component, use_native=use_native) + + def do_connect(self): + if sys.platform != 'darwin': + self.port = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + retries = self.retries + if retries <= 0: + # try to connect at least once: + retries = 1 + while retries >= 0: + retries -= 1 + try: + if sys.platform == 'darwin': + self.port = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.port.connect(self.destination_addr) + break + except Exception as e: + if retries == 0: + if self.port is not None: + self.port.close() + self.port = None + raise e + print(e, "sleeping") + time.sleep(1) + self.port.setblocking(0) + set_close_on_exec(self.port.fileno()) + self.port.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + + def close(self): + self.port.close() + + def handle_disconnect(self): + print("Connection reset or closed by peer on TCP socket") + self.reconnect() + + def handle_eof(self): + # EOF + print("EOF on TCP socket") + self.reconnect() + + def recv(self,n=None): + if self.port is None: + self.reconnect() + if n is None: + n = self.mav.bytes_needed() + try: + data = self.port.recv(n) + except socket.error as e: + if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK ]: + return "" + if e.errno in [ errno.ECONNRESET, errno.EPIPE ]: + self.handle_disconnect() + raise + if len(data) == 0: + self.handle_eof() + + return data + + def write(self, buf): + if self.port is None: + try: + self.reconnect() + except socket.error as e: + pass + if self.port is None: + return + try: + self.port.send(buf) + except socket.error as e: + if e.errno in [ errno.ECONNRESET, errno.EPIPE ]: + self.handle_disconnect() + pass + + def reconnect(self): + if self.autoreconnect: + print("Attempting reconnect") + if self.port is not None: + self.port.close() + self.port = None + self.do_connect() + + +class mavtcpin(mavfile): + '''a TCP input mavlink socket''' + def __init__(self, device, source_system=255, source_component=0, retries=3, use_native=default_native): + a = device.split(':') + if len(a) != 2: + raise ValueError("TCP ports must be specified as host:port") + self.listen = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.listen_addr = (a[0], int(a[1])) + self.listen.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.listen.bind(self.listen_addr) + self.listen.listen(1) + self.listen.setblocking(0) + set_close_on_exec(self.listen.fileno()) + self.listen.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + mavfile.__init__(self, self.listen.fileno(), "tcpin:" + device, source_system=source_system, source_component=source_component, use_native=use_native) + self.port = None + + def close(self): + if self.port is not None: + self.port.close() + self.listen.close() + + def recv(self,n=None): + if not self.port: + try: + (self.port, addr) = self.listen.accept() + except Exception: + return '' + self.port.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + self.port.setblocking(0) + set_close_on_exec(self.port.fileno()) + self.fd = self.port.fileno() + + if n is None: + n = self.mav.bytes_needed() + try: + data = self.port.recv(n) + except socket.error as e: + if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK ]: + return "" + self.port.close() + self.port = None + self.fd = self.listen.fileno() + return '' + return data + + def write(self, buf): + if self.port is None: + return + try: + self.port.send(buf) + except socket.error as e: + if e.errno in [ errno.EPIPE ]: + self.port.close() + self.port = None + self.fd = self.listen.fileno() + pass + + +class mavlogfile(mavfile): + '''a MAVLink logfile reader/writer''' + def __init__(self, filename, planner_format=None, + write=False, append=False, + robust_parsing=True, notimestamps=False, source_system=255, source_component=0, use_native=default_native): + self.filename = filename + self.writeable = write + self.robust_parsing = robust_parsing + self.planner_format = planner_format + self._two64 = math.pow(2.0, 63) + mode = 'rb' + if self.writeable: + if append: + mode = 'ab' + else: + mode = 'wb' + self.f = open(filename, mode) + self.filesize = os.path.getsize(filename) + self.percent = 0 + mavfile.__init__(self, None, filename, source_system=source_system, source_component=source_component, notimestamps=notimestamps, use_native=use_native) + if self.notimestamps: + self._timestamp = 0 + else: + self._timestamp = time.time() + self.stop_on_EOF = True + self._last_message = None + self._last_timestamp = None + self._link = 0 + + def close(self): + self.f.close() + + def recv(self,n=None): + if n is None: + n = self.mav.bytes_needed() + return self.f.read(n) + + def write(self, buf): + self.f.write(buf) + + def scan_timestamp(self, tbuf): + '''scan forward looking in a tlog for a timestamp in a reasonable range''' + while True: + (tusec,) = struct.unpack('>Q', tbuf) + t = tusec * 1.0e-6 + if abs(t - self._last_timestamp) <= 3*24*60*60: + break + c = self.f.read(1) + if len(c) != 1: + break + tbuf = tbuf[1:] + c + return t + + + def pre_message(self): + '''read timestamp if needed''' + # read the timestamp + if self.filesize != 0: + self.percent = (100.0 * self.f.tell()) / self.filesize + if self.notimestamps: + return + if self.planner_format: + tbuf = self.f.read(21) + if len(tbuf) != 21 or tbuf[0] != '-' or tbuf[20] != ':': + raise RuntimeError('bad planner timestamp %s' % tbuf) + hnsec = self._two64 + float(tbuf[0:20]) + t = hnsec * 1.0e-7 # convert to seconds + t -= 719163 * 24 * 60 * 60 # convert to 1970 base + self._link = 0 + else: + tbuf = self.f.read(8) + if len(tbuf) != 8: + return + (tusec,) = struct.unpack('>Q', tbuf) + t = tusec * 1.0e-6 + if (self._last_timestamp is not None and + self._last_message.get_type() == "BAD_DATA" and + abs(t - self._last_timestamp) > 3*24*60*60): + t = self.scan_timestamp(tbuf) + self._link = tusec & 0x3 + self._timestamp = t + + def post_message(self, msg): + '''add timestamp to message''' + # read the timestamp + super(mavlogfile, self).post_message(msg) + if self.planner_format: + self.f.read(1) # trailing newline + self.timestamp = msg._timestamp + self._last_message = msg + if msg.get_type() != "BAD_DATA": + self._last_timestamp = msg._timestamp + msg._link = self._link + + +class mavmmaplog(mavlogfile): + '''a MAVLink log file accessed via mmap. Used for fast read-only + access with low memory overhead where particular message types are wanted''' + def __init__(self, filename, progress_callback=None): + import mmap + mavlogfile.__init__(self, filename) + self.f.seek(0, 2) + self.data_len = self.f.tell() + self.f.seek(0) + self.data_map = None + if self.data_len != 0: + if platform.system() == "Windows": + self.data_map = mmap.mmap(self.f.fileno(), self.data_len, None, mmap.ACCESS_READ) + else: + self.data_map = mmap.mmap(self.f.fileno(), self.data_len, mmap.MAP_PRIVATE, mmap.PROT_READ) + self._rewind() + self.init_arrays(progress_callback) + self._flightmodes = None + + def _rewind(self): + '''rewind to start of log''' + self.flightmode = "UNKNOWN" + self.offset = 0 + self.type_nums = None + self.f.seek(0) + + def rewind(self): + '''rewind to start of log''' + self._rewind() + + def close(self): + super(mavmmaplog, self).close() + if self.data_map is not None: + self.data_map.close() + + def init_arrays(self, progress_callback=None): + '''initialise arrays for fast recv_match()''' + + # dictionary indexed by msgid, mapping to arrays of file offsets where + # each instance of a msg type is found + self.offsets = {} + + # number of msgs of each msg type + self.counts = {} + self._count = 0 + + # mapping from msg name to msg id + self.name_to_id = {} + + # mapping from msg id to name + self.id_to_name = {} + + self.instance_offsets = {} + self.instance_lengths = {} + + self.type_nums = None + + ofs = 0 + pct = 0 + + MARKER_V1 = 0xFE + MARKER_V2 = 0xFD + + while ofs+8+6 < self.data_len: + marker = u_ord(self.data_map[ofs+8]) + mlen = u_ord(self.data_map[ofs+9]) + 8 + if marker == MARKER_V1: + mtype = u_ord(self.data_map[ofs+13]) + mlen += 8 + data_ofs = 14 + elif marker == MARKER_V2: + if ofs+8+10 > self.data_len: + break + mtype = u_ord(self.data_map[ofs+15]) | (u_ord(self.data_map[ofs+16])<<8) | (u_ord(self.data_map[ofs+17])<<16) + mlen += 12 + data_ofs = 18 + incompat_flags = u_ord(self.data_map[ofs+10]) + if incompat_flags & mavlink.MAVLINK_IFLAG_SIGNED: + mlen += mavlink.MAVLINK_SIGNATURE_BLOCK_LEN + else: + # unrecognised marker; probably a malformed log + ofs += 1 + continue + + if not mtype in self.offsets: + if not mtype in mavlink.mavlink_map: + ofs += mlen + continue + self.offsets[mtype] = [] + self.counts[mtype] = 0 + msg = mavlink.mavlink_map[mtype] + self.name_to_id[msg.msgname] = mtype + self.id_to_name[mtype] = msg.msgname + self.f.seek(ofs) + m = self.recv_msg() + add_message(self.messages, msg.msgname, m) + if m._instance_field is not None: + instance_idx = m.ordered_fieldnames.index(m._instance_field) + self.instance_offsets[mtype] = m._instance_offset + alen = m.array_lengths[instance_idx] + if alen > 0: + self.instance_lengths[mtype] = alen + else: + self.instance_lengths[mtype] = 1 + + if mtype in self.instance_offsets: + # populate the messages array with a new instance. This assumes we can get the instance + # as a single byte integer + instance_field_ofs = ofs + data_ofs + self.instance_offsets[mtype] + if instance_field_ofs >= self.data_len: + # truncated log + break + self.f.seek(instance_field_ofs) + ilen = self.instance_lengths[mtype] + ipad = 0 + if ilen + (instance_field_ofs - ofs) > mlen-2: + # message is MAVLink2.0 zero truncated + ipad = ilen + (instance_field_ofs - ofs) - (mlen-2) + ilen -= ipad + if ilen > 0: + b = self.f.read(ilen) + else: + b = bytes([0]*ilen) + if ipad > 0: + b += bytes([0]*ipad) + if ilen+ipad > 1: + # assume string + while len(b) > 0 and b[-1] == 0: + b = b[:-1] + instance = b.decode('ASCII',errors='ignore').rstrip() + else: + instance, = struct.unpack('b', b[:1]) + mname = self.id_to_name[mtype] + if mname in self.messages: + iname = "%s[%s]" % (mname, str(instance)) + self.messages[iname] = self.messages[mname] + + self.offsets[mtype].append(ofs) + self.counts[mtype] += 1 + + ofs += mlen + new_pct = (100 * ofs) // self.data_len + if progress_callback is not None and new_pct != pct: + progress_callback(new_pct) + pct = new_pct + + for mtype in self.counts: + self._count += self.counts[mtype] + self.offset = 0 + self._rewind() + + def skip_to_type(self, type): + '''skip fwd to next msg matching given type set''' + if self.data_map is None: + return + if self.type_nums is None: + # always add some key msg types so we can track flightmode, params etc + type = type.copy() + type.update(set(['HEARTBEAT','PARAM_VALUE'])) + self.indexes = [] + self.type_nums = [] + for t in type: + if not t in self.name_to_id: + continue + self.type_nums.append(self.name_to_id[t]) + self.indexes.append(0) + smallest_index = -1 + smallest_offset = self.data_len + for i in range(len(self.type_nums)): + mtype = self.type_nums[i] + if self.indexes[i] >= self.counts[mtype]: + continue + ofs = self.offsets[mtype][self.indexes[i]] + if ofs < smallest_offset: + smallest_offset = ofs + smallest_index = i + if smallest_index >= 0: + self.indexes[smallest_index] += 1 + self.offset = smallest_offset + self.f.seek(smallest_offset) + + def recv_match(self, condition=None, type=None, blocking=False, timeout=None): + '''recv the next message that matches the given condition + type can be a string or a list of strings''' + if type is not None: + if isinstance(type, str): + type = set([type]) + elif isinstance(type, list): + type = set(type) + while True: + if type is not None: + self.skip_to_type(type) + m = self.recv_msg() + if m is None: + if blocking: + for hook in self.idle_hooks: + hook(self) + if timeout is None: + self.select(0.05) + else: + self.select(timeout/2) + continue + return None + if type is not None and not m.get_type() in type: + continue + if not evaluate_condition(condition, self.messages): + continue + return m + + def flightmode_list(self): + '''return an array of tuples for all flightmodes in log. Tuple is (modestring, t0, t1)''' + tstamp = None + fmode = None + if self._flightmodes is None: + self._rewind() + self._flightmodes = [] + types = set(['HEARTBEAT']) + while True: + m = self.recv_match(type=types) + if m is None: + break + tstamp = m._timestamp + if self.flightmode == fmode: + continue + if len(self._flightmodes) > 0: + (mode, t0, t1) = self._flightmodes[-1] + self._flightmodes[-1] = (mode, t0, tstamp) + self._flightmodes.append((self.flightmode, tstamp, None)) + fmode = self.flightmode + if tstamp is not None: + (mode, t0, t1) = self._flightmodes[-1] + self._flightmodes[-1] = (mode, t0, tstamp) + + self._rewind() + return self._flightmodes + +class mavchildexec(mavfile): + '''a MAVLink child processes reader/writer''' + def __init__(self, filename, source_system=255, source_component=0, use_native=default_native): + from subprocess import Popen, PIPE + import fcntl + + self.filename = filename + self.child = Popen(filename, shell=False, stdout=PIPE, stdin=PIPE, bufsize=0) + self.fd = self.child.stdout.fileno() + + fl = fcntl.fcntl(self.fd, fcntl.F_GETFL) + fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + + fl = fcntl.fcntl(self.child.stdout.fileno(), fcntl.F_GETFL) + fcntl.fcntl(self.child.stdout.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK) + + mavfile.__init__(self, self.fd, filename, source_system=source_system, source_component=source_component, use_native=use_native) + + def close(self): + self.child.close() + + def recv(self,n=None): + try: + x = self.child.stdout.read(1) + except Exception: + return '' + return x + + def write(self, buf): + self.child.stdin.write(buf) + +class mavwebsocket(mavfile): + '''Mavlink WebSocket server, single client only''' + def __init__(self, device, source_system=255, source_component=0, use_native=default_native): + # Try importing wsproto, we don't need it here but it means we fail early if its missing + import wsproto + + self.ws = None + + # This is a duplicate of mavtcpin + a = device.split(':') + if len(a) != 2: + raise ValueError("WebSocket ports must be specified as host:port") + self.listen = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.listen_addr = (a[0], int(a[1])) + self.listen.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.listen.bind(self.listen_addr) + self.listen.listen(1) + self.listen.setblocking(0) + set_close_on_exec(self.listen.fileno()) + self.listen.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + mavfile.__init__(self, self.listen.fileno(), "wsserver:" + device, source_system=source_system, source_component=source_component, use_native=use_native) + self.port = None + + def close_port(self): + self.port.close() + self.port = None + self.fd = self.listen.fileno() + self.ws = None + + def close(self): + if self.port is not None: + self.close_port() + self.listen.close() + + def recv(self,n=None): + from wsproto import ConnectionType, WSConnection, utilities + from wsproto.events import ( + AcceptConnection, + CloseConnection, + Request, + BytesMessage, + ) + + # Based on: https://github.com/python-hyper/wsproto/blob/main/example/synchronous_server.py + if not self.port: + try: + (self.port, addr) = self.listen.accept() + except Exception: + return '' + self.port.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + self.port.setblocking(0) + set_close_on_exec(self.port.fileno()) + self.fd = self.port.fileno() + + # Start server + self.ws = WSConnection(ConnectionType.SERVER) + + if not self.ws: + # Should probbily raise a exception of some sort + return '' + + # Read in some data and pass it to the WebSocket handeler + RECEIVE_BYTES = 4096 + try: + in_data = self.port.recv(RECEIVE_BYTES) + self.ws.receive_data(in_data) + except socket.error as e: + if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK ]: + return '' + self.close_port() + return '' + except utilities.RemoteProtocolError: + self.close_port() + return '' + + # Procces WebSocket events + data = b"" + reply = b"" + keep_running = True + for event in self.ws.events(): + if isinstance(event, Request): + # Negotiate new WebSocket connection + reply += self.ws.send(AcceptConnection()) + + elif isinstance(event, CloseConnection): + # Request to close + reply += self.ws.send(event.response()) + keep_running = False + + elif isinstance(event, BytesMessage): + # Some actual MAVLink data + data += event.data + + if len(reply) > 0: + # Send any reply to incomming requests + self.port.send(reply) + + if not keep_running: + self.close_port() + + # Return the extracted data + return data + + def write(self, buf): + if self.port is None or self.ws is None: + return + + from wsproto.events import ( + BytesMessage, + ) + + # Pack buf into WebSocket binary message + packed = self.ws.send(BytesMessage(data = buf)) + + try: + self.port.send(packed) + except socket.error as e: + if e.errno in [ errno.EPIPE ]: + self.close_port() + pass + + +def mavlink_connection(device, baud=115200, source_system=255, source_component=0, + planner_format=None, write=False, append=False, + robust_parsing=True, notimestamps=False, input=True, + dialect=None, autoreconnect=False, zero_time_base=False, + retries=3, use_native=default_native, + force_connected=False, progress_callback=None, + udp_timeout=0, **opts): + '''open a serial, UDP, TCP or file mavlink connection''' + global mavfile_global + + if force_connected: + # force_connected implies autoreconnect + autoreconnect = True + + if dialect is not None: + set_dialect(dialect) + if device.startswith('tcp:'): + return mavtcp(device[4:], + autoreconnect=autoreconnect, + source_system=source_system, + source_component=source_component, + retries=retries, + use_native=use_native) + if device.startswith('tcpin:'): + return mavtcpin(device[6:], source_system=source_system, source_component=source_component, retries=retries, use_native=use_native) + if device.startswith('udpin:'): + return mavudp(device[6:], input=True, source_system=source_system, source_component=source_component, use_native=use_native, timeout=udp_timeout) + if device.startswith('udpout:'): + return mavudp(device[7:], input=False, source_system=source_system, source_component=source_component, use_native=use_native) + if device.startswith('udpbcast:'): + return mavudp(device[9:], input=False, source_system=source_system, source_component=source_component, use_native=use_native, broadcast=True) + if device.startswith('wsserver:'): + return mavwebsocket(device[9:], source_system=source_system, source_component=source_component, use_native=use_native) + # For legacy purposes we accept the following syntax and let the caller to specify direction + if device.startswith('udp:'): + return mavudp(device[4:], input=input, source_system=source_system, source_component=source_component, use_native=use_native) + if device.startswith('mcast:'): + return mavmcast(device[6:], source_system=source_system, source_component=source_component, use_native=use_native) + + if device.lower().endswith('.bin') or device.lower().endswith('.px4log'): + # support dataflash logs + from pymavlink import DFReader + m = DFReader.DFReader_binary(device, zero_time_base=zero_time_base, progress_callback=progress_callback) + mavfile_global = m + return m + + if device.lower().startswith('csv:'): + # support CSV logs + from pymavlink import CSVReader + # special-case for users wanting a : separator: + colon_separator_re = "" + if re.match(".*separator=::?.*", device): + opts["separator"] = ":" + device = re.sub(":separator=:", "", device) + components = device.split(":") + filename = components[1] + for nv in components[2:]: + (name, value) = nv.split('=') + opts[name] = value + m = CSVReader.CSVReader(filename, + zero_time_base=zero_time_base, + progress_callback=progress_callback, + **opts) + mavfile_global = m + return m + + if device.endswith('.log'): + # support dataflash text logs + from pymavlink import DFReader + if DFReader.DFReader_is_text_log(device): + m = DFReader.DFReader_text(device, zero_time_base=zero_time_base, progress_callback=progress_callback) + mavfile_global = m + return m + + # list of suffixes to prevent setting DOS paths as UDP sockets + logsuffixes = ['mavlink', 'log', 'raw', 'tlog' ] + suffix = device.split('.')[-1].lower() + if device.find(':') != -1 and not suffix in logsuffixes: + return mavudp(device, source_system=source_system, source_component=source_component, input=input, use_native=use_native) + if os.path.isfile(device): + if device.endswith(".elf") or device.find("/bin/") != -1: + print("executing '%s'" % device) + return mavchildexec(device, source_system=source_system, source_component=source_component, use_native=use_native) + elif not write and not append and not notimestamps: + return mavmmaplog(device, progress_callback=progress_callback) + else: + return mavlogfile(device, planner_format=planner_format, write=write, + append=append, robust_parsing=robust_parsing, notimestamps=notimestamps, + source_system=source_system, source_component=source_component, use_native=use_native) + return mavserial(device, + baud=baud, + source_system=source_system, + source_component=source_component, + autoreconnect=autoreconnect, + use_native=use_native, + force_connected=force_connected) + +class periodic_event(object): + '''a class for fixed frequency events''' + def __init__(self, frequency): + self.frequency = float(frequency) + self.last_time = time.time() + + def force(self): + '''force immediate triggering''' + self.last_time = 0 + + def trigger(self): + '''return True if we should trigger now''' + tnow = time.time() + + if tnow < self.last_time: + print("Warning, time moved backwards. Restarting timer.") + self.last_time = tnow + + if self.last_time + (1.0/self.frequency) <= tnow: + self.last_time = tnow + return True + return False + + +try: + from curses import ascii + have_ascii = True +except: + have_ascii = False + +def is_printable(c): + '''see if a character is printable''' + if have_ascii: + return ascii.isprint(c) + if isinstance(c, int): + ic = c + else: + ic = ord(c) + return ic >= 32 and ic <= 126 + +def all_printable(buf): + '''see if a string is all printable''' + for c in buf: + if not is_printable(c) and not c in ['\r', '\n', '\t'] and not c in [ord('\r'), ord('\n'), ord('\t')]: + return False + return True + +class SerialPort(object): + '''auto-detected serial port''' + def __init__(self, device, description=None, hwid=None): + self.device = device + self.description = description + self.hwid = hwid + + def __str__(self): + ret = self.device + if self.description is not None: + ret += " : " + self.description + if self.hwid is not None: + ret += " : " + self.hwid + return ret + +def auto_detect_serial_win32(preferred_list=['*']): + '''try to auto-detect serial ports on win32''' + try: + from serial.tools.list_ports_windows import comports + list = sorted(comports()) + except: + return [] + ret = [] + others = [] + for port, description, hwid in list: + matches = False + p = SerialPort(port, description=description, hwid=hwid) + for preferred in preferred_list: + if fnmatch.fnmatch(description, preferred) or fnmatch.fnmatch(hwid, preferred): + matches = True + if matches: + ret.append(p) + else: + others.append(p) + if len(ret) > 0: + return ret + # now the rest + ret.extend(others) + return ret + + + + +def auto_detect_serial_unix(preferred_list=['*']): + '''try to auto-detect serial ports on unix''' + import glob + glist = glob.glob('/dev/ttyS*') + glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*') + glob.glob('/dev/serial/by-id/*') + ret = [] + others = [] + # try preferred ones first + for d in glist: + matches = False + for preferred in preferred_list: + if fnmatch.fnmatch(d, preferred): + matches = True + if matches: + ret.append(SerialPort(d)) + else: + others.append(SerialPort(d)) + if len(ret) > 0: + return ret + ret.extend(others) + return ret + +def auto_detect_serial(preferred_list=['*']): + '''try to auto-detect serial port''' + # see if + if os.name == 'nt': + return auto_detect_serial_win32(preferred_list=preferred_list) + return auto_detect_serial_unix(preferred_list=preferred_list) + +def mode_string_v09(msg): + '''mode string for 0.9 protocol''' + mode = msg.mode + nav_mode = msg.nav_mode + + MAV_MODE_UNINIT = 0 + MAV_MODE_MANUAL = 2 + MAV_MODE_GUIDED = 3 + MAV_MODE_AUTO = 4 + MAV_MODE_TEST1 = 5 + MAV_MODE_TEST2 = 6 + MAV_MODE_TEST3 = 7 + + MAV_NAV_GROUNDED = 0 + MAV_NAV_LIFTOFF = 1 + MAV_NAV_HOLD = 2 + MAV_NAV_WAYPOINT = 3 + MAV_NAV_VECTOR = 4 + MAV_NAV_RETURNING = 5 + MAV_NAV_LANDING = 6 + MAV_NAV_LOST = 7 + MAV_NAV_LOITER = 8 + + cmode = (mode, nav_mode) + mapping = { + (MAV_MODE_UNINIT, MAV_NAV_GROUNDED) : "INITIALISING", + (MAV_MODE_MANUAL, MAV_NAV_VECTOR) : "MANUAL", + (MAV_MODE_TEST3, MAV_NAV_VECTOR) : "CIRCLE", + (MAV_MODE_GUIDED, MAV_NAV_VECTOR) : "GUIDED", + (MAV_MODE_TEST1, MAV_NAV_VECTOR) : "STABILIZE", + (MAV_MODE_TEST2, MAV_NAV_LIFTOFF) : "FBWA", + (MAV_MODE_AUTO, MAV_NAV_WAYPOINT) : "AUTO", + (MAV_MODE_AUTO, MAV_NAV_RETURNING) : "RTL", + (MAV_MODE_AUTO, MAV_NAV_LOITER) : "LOITER", + (MAV_MODE_AUTO, MAV_NAV_LIFTOFF) : "TAKEOFF", + (MAV_MODE_AUTO, MAV_NAV_LANDING) : "LANDING", + (MAV_MODE_AUTO, MAV_NAV_HOLD) : "LOITER", + (MAV_MODE_GUIDED, MAV_NAV_VECTOR) : "GUIDED", + (MAV_MODE_GUIDED, MAV_NAV_WAYPOINT) : "GUIDED", + (100, MAV_NAV_VECTOR) : "STABILIZE", + (101, MAV_NAV_VECTOR) : "ACRO", + (102, MAV_NAV_VECTOR) : "ALT_HOLD", + (107, MAV_NAV_VECTOR) : "CIRCLE", + (109, MAV_NAV_VECTOR) : "LAND", + } + if cmode in mapping: + return mapping[cmode] + return "Mode(%s,%s)" % cmode + +mode_mapping_apm = { + 0 : 'MANUAL', + 1 : 'CIRCLE', + 2 : 'STABILIZE', + 3 : 'TRAINING', + 4 : 'ACRO', + 5 : 'FBWA', + 6 : 'FBWB', + 7 : 'CRUISE', + 8 : 'AUTOTUNE', + 10 : 'AUTO', + 11 : 'RTL', + 12 : 'LOITER', + 13 : 'TAKEOFF', + 14 : 'AVOID_ADSB', + 15 : 'GUIDED', + 16 : 'INITIALISING', + 17 : 'QSTABILIZE', + 18 : 'QHOVER', + 19 : 'QLOITER', + 20 : 'QLAND', + 21 : 'QRTL', + 22 : 'QAUTOTUNE', + 23 : 'QACRO', + 24 : 'THERMAL', + 25 : 'LOITERALTQLAND', + 26 : 'AUTOLAND', +} + +mode_mapping_acm = { + 0 : 'STABILIZE', + 1 : 'ACRO', + 2 : 'ALT_HOLD', + 3 : 'AUTO', + 4 : 'GUIDED', + 5 : 'LOITER', + 6 : 'RTL', + 7 : 'CIRCLE', + 8 : 'POSITION', + 9 : 'LAND', + 10 : 'OF_LOITER', + 11 : 'DRIFT', + 13 : 'SPORT', + 14 : 'FLIP', + 15 : 'AUTOTUNE', + 16 : 'POSHOLD', + 17 : 'BRAKE', + 18 : 'THROW', + 19 : 'AVOID_ADSB', + 20 : 'GUIDED_NOGPS', + 21 : 'SMART_RTL', + 22 : 'FLOWHOLD', + 23 : 'FOLLOW', + 24 : 'ZIGZAG', + 25 : 'SYSTEMID', + 26 : 'AUTOROTATE', + 27 : 'AUTO_RTL', +} + +mode_mapping_rover = { + 0 : 'MANUAL', + 1 : 'ACRO', + 2 : 'LEARNING', + 3 : 'STEERING', + 4 : 'HOLD', + 5 : 'LOITER', + 6 : 'FOLLOW', + 7 : 'SIMPLE', + 8 : 'DOCK', + 9 : 'CIRCLE', + 10 : 'AUTO', + 11 : 'RTL', + 12 : 'SMART_RTL', + 15 : 'GUIDED', + 16 : 'INITIALISING' +} + +mode_mapping_tracker = { + 0 : 'MANUAL', + 1 : 'STOP', + 2 : 'SCAN', + 4 : 'GUIDED', + 10 : 'AUTO', + 16 : 'INITIALISING' +} + +mode_mapping_sub = { + 0: 'STABILIZE', + 1: 'ACRO', + 2: 'ALT_HOLD', + 3: 'AUTO', + 4: 'GUIDED', + 7: 'CIRCLE', + 9: 'SURFACE', + 16: 'POSHOLD', + 19: 'MANUAL', +} + +mode_mapping_blimp = { + 0 : 'LAND', + 1 : 'MANUAL', + 2 : 'VELOCITY', + 3 : 'LOITER', + 4 : 'RTL', +} + +AP_MAV_TYPE_MODE_MAP_DEFAULT = { + # copter + mavlink.MAV_TYPE_HELICOPTER: mode_mapping_acm, + mavlink.MAV_TYPE_TRICOPTER: mode_mapping_acm, + mavlink.MAV_TYPE_QUADROTOR: mode_mapping_acm, + mavlink.MAV_TYPE_HEXAROTOR: mode_mapping_acm, + mavlink.MAV_TYPE_OCTOROTOR: mode_mapping_acm, + mavlink.MAV_TYPE_DECAROTOR: mode_mapping_acm, + mavlink.MAV_TYPE_DODECAROTOR: mode_mapping_acm, + mavlink.MAV_TYPE_COAXIAL: mode_mapping_acm, + # plane + mavlink.MAV_TYPE_FIXED_WING: mode_mapping_apm, + mavlink.MAV_TYPE_VTOL_TILTROTOR: mode_mapping_apm, + # rover + mavlink.MAV_TYPE_GROUND_ROVER: mode_mapping_rover, + # boat + mavlink.MAV_TYPE_SURFACE_BOAT: mode_mapping_rover, # for the time being + # tracker + mavlink.MAV_TYPE_ANTENNA_TRACKER: mode_mapping_tracker, + # sub + mavlink.MAV_TYPE_SUBMARINE: mode_mapping_sub, + # blimp + mavlink.MAV_TYPE_AIRSHIP: mode_mapping_blimp, +} + +# cope with enumeration renaming: +for (name, some_map) in ([ + # plane, see https://github.com/ArduPilot/pymavlink/issues/571 + ("MAV_TYPE_VTOL_DUOROTOR", mode_mapping_apm), + ("MAV_TYPE_VTOL_TAILSITTER_DUOROTOR", mode_mapping_apm), + ("MAV_TYPE_VTOL_QUADROTOR", mode_mapping_apm), + ("MAV_TYPE_VTOL_TAILSITTER_QUADROTOR", mode_mapping_apm), + ]): + try: + AP_MAV_TYPE_MODE_MAP_DEFAULT[getattr(mavlink, name)] = some_map + except AttributeError: + pass + +try: + # Allow for using custom mode maps by importing a JSON dict from + # "~/.pymavlink/custom_mode_map.json" and using it to extend the hard-coded + # AP_MAV_TYPE_MODE_MAP_DEFAULT dict. + from os.path import expanduser + + _custom_mode_map_path = os.path.join("~", ".pymavlink", "custom_mode_map.json") + _custom_mode_map_path = expanduser(_custom_mode_map_path) + try: + with open(_custom_mode_map_path) as f: + _json_mode_map = json.load(f) + except json.decoder.JSONDecodeError as ex: + # inform the user of a malformed custom_mode_map.json + print("Error: pymavlink custom mode file ('" + _custom_mode_map_path + "') is not valid JSON.") + raise + except Exception: + # file is not present, fall back to using default map + raise + + try: + _custom_mode_map = {} + for mav_type, mode_map in _json_mode_map.items(): + # make sure the custom map has the right datatypes + _custom_mode_map[int(mav_type)] = { int(mode_num): str(mode_name) for mode_num, mode_name in mode_map.items() } + except Exception: + # inform the user of invalid custom mode map + print("Error: invalid pymavlink custom mode map dict in " + _custom_mode_map_path) + raise + + AP_MAV_TYPE_MODE_MAP = AP_MAV_TYPE_MODE_MAP_DEFAULT.copy() + AP_MAV_TYPE_MODE_MAP.update(_custom_mode_map) +except Exception: + # revert to using default mode map + AP_MAV_TYPE_MODE_MAP = AP_MAV_TYPE_MODE_MAP_DEFAULT + + +# map from a PX4 "main_state" to a string; see msg/commander_state.msg +# This allows us to map sdlog STAT.MainState to a simple "mode" +# string, used in DFReader and possibly other places. These are +# related but distict from what is found in mavlink messages; see +# "Custom mode definitions", below. +mainstate_mapping_px4 = { + 0 : 'MANUAL', + 1 : 'ALTCTL', + 2 : 'POSCTL', + 3 : 'AUTO_MISSION', + 4 : 'AUTO_LOITER', + 5 : 'AUTO_RTL', + 6 : 'ACRO', + 7 : 'OFFBOARD', + 8 : 'STAB', + 9 : 'RATTITUDE', + 10 : 'AUTO_TAKEOFF', + 11 : 'AUTO_LAND', + 12 : 'AUTO_FOLLOW_TARGET', + 13 : 'MAX', +} +def mode_string_px4(MainState): + return mainstate_mapping_px4.get(MainState, "Unknown") + + +# Custom mode definitions from PX4 +PX4_CUSTOM_MAIN_MODE_MANUAL = 1 +PX4_CUSTOM_MAIN_MODE_ALTCTL = 2 +PX4_CUSTOM_MAIN_MODE_POSCTL = 3 +PX4_CUSTOM_MAIN_MODE_AUTO = 4 +PX4_CUSTOM_MAIN_MODE_ACRO = 5 +PX4_CUSTOM_MAIN_MODE_OFFBOARD = 6 +PX4_CUSTOM_MAIN_MODE_STABILIZED = 7 +PX4_CUSTOM_MAIN_MODE_RATTITUDE = 8 + +PX4_CUSTOM_SUB_MODE_OFFBOARD = 0 +PX4_CUSTOM_SUB_MODE_AUTO_READY = 1 +PX4_CUSTOM_SUB_MODE_AUTO_TAKEOFF = 2 +PX4_CUSTOM_SUB_MODE_AUTO_LOITER = 3 +PX4_CUSTOM_SUB_MODE_AUTO_MISSION = 4 +PX4_CUSTOM_SUB_MODE_AUTO_RTL = 5 +PX4_CUSTOM_SUB_MODE_AUTO_LAND = 6 +PX4_CUSTOM_SUB_MODE_AUTO_RTGS = 7 +PX4_CUSTOM_SUB_MODE_AUTO_FOLLOW_TARGET = 8 + +auto_mode_flags = mavlink.MAV_MODE_FLAG_AUTO_ENABLED \ + | mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED \ + | mavlink.MAV_MODE_FLAG_GUIDED_ENABLED + +px4_map = { "MANUAL": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_MANUAL, 0 ), + "STABILIZED": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_STABILIZED, 0 ), + "ACRO": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_ACRO, 0 ), + "RATTITUDE": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_RATTITUDE, 0 ), + "ALTCTL": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_ALTCTL, 0 ), + "POSCTL": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_POSCTL, 0 ), + "LOITER": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_LOITER ), + "MISSION": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_MISSION ), + "RTL": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_RTL ), + "LAND": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_LAND ), + "RTGS": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_RTGS ), + "FOLLOWME": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_FOLLOW_TARGET ), + "OFFBOARD": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_OFFBOARD, 0 ), + "TAKEOFF": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_TAKEOFF )} + + +def interpret_px4_mode(base_mode, custom_mode): + custom_main_mode = (custom_mode & 0xFF0000) >> 16 + custom_sub_mode = (custom_mode & 0xFF000000) >> 24 + + if base_mode & mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED != 0: #manual modes + if custom_main_mode == PX4_CUSTOM_MAIN_MODE_MANUAL: + return "MANUAL" + elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_ACRO: + return "ACRO" + elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_RATTITUDE: + return "RATTITUDE" + elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_STABILIZED: + return "STABILIZED" + elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_ALTCTL: + return "ALTCTL" + elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_POSCTL: + return "POSCTL" + elif (base_mode & auto_mode_flags) == auto_mode_flags: #auto modes + if custom_main_mode & PX4_CUSTOM_MAIN_MODE_AUTO != 0: + if custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_MISSION: + return "MISSION" + elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_TAKEOFF: + return "TAKEOFF" + elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_LOITER: + return "LOITER" + elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_FOLLOW_TARGET: + return "FOLLOWME" + elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_RTL: + return "RTL" + elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_LAND: + return "LAND" + elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_RTGS: + return "RTGS" + elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_OFFBOARD: + return "OFFBOARD" + return "UNKNOWN" + +def mode_mapping_byname(mav_type): + '''return dictionary mapping mode names to numbers, or None if unknown''' + mode_map = mode_mapping_bynumber(mav_type) + if mode_map is None: + return None + inv_map = dict((a, b) for (b, a) in mode_map.items()) + return inv_map + +def mode_mapping_bynumber(mav_type): + '''return dictionary mapping mode numbers to name, or None if unknown''' + return AP_MAV_TYPE_MODE_MAP[mav_type] if mav_type in AP_MAV_TYPE_MODE_MAP else None + + +def mode_string_v10(msg): + '''mode string for 1.0 protocol, from heartbeat''' + if msg.autopilot == mavlink.MAV_AUTOPILOT_PX4: + if msg.get_type() == "HIGH_LATENCY2": + return "Mode(%u)" % msg.custom_mode + + return interpret_px4_mode(msg.base_mode, msg.custom_mode) + + if msg.get_type() != 'HIGH_LATENCY2' and not msg.base_mode & mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED: + return "Mode(0x%08x)" % msg.base_mode + + mode_map = mode_mapping_bynumber(msg.type) + if mode_map and msg.custom_mode in mode_map: + return mode_map[msg.custom_mode] + return "Mode(%u)" % msg.custom_mode + +def mode_string_apm(mode_number): + '''return mode string for ArduPlane''' + if mode_number in mode_mapping_apm: + return mode_mapping_apm[mode_number] + return "Mode(%u)" % mode_number + +def mode_string_acm(mode_number): + '''return mode string for ArduCopter''' + if mode_number in mode_mapping_acm: + return mode_mapping_acm[mode_number] + return "Mode(%u)" % mode_number + +class MavlinkSerialPort(object): + '''an object that looks like a serial port, but + transmits using mavlink SERIAL_CONTROL packets''' + def __init__(self, portname, baudrate, devnum=0, devbaud=0, timeout=3, debug=0): + from . import mavutil + + self.baudrate = 0 + self.timeout = timeout + self._debug = debug + self.buf = bytearray() + self.port = devnum + self.debug("Connecting with MAVLink to %s ..." % portname) + self.mav = mavutil.mavlink_connection(portname, autoreconnect=True, baud=baudrate) + self.mav.wait_heartbeat() + self.debug("HEARTBEAT OK\n") + if devbaud != 0: + self.setBaudrate(devbaud) + self.debug("Locked serial device\n") + + def debug(self, s, level=1): + '''write some debug text''' + if self._debug >= level: + print(s) + + def write(self, b): + '''write some bytes''' + from . import mavutil + while len(b) > 0: + n = len(b) + if n > 70: + n = 70 + buf = bytearray(b[:]) + buf.extend([0]*(70-len(buf))) + self.mav.mav.serial_control_send(self.port, + mavutil.mavlink.SERIAL_CONTROL_FLAG_EXCLUSIVE | + mavutil.mavlink.SERIAL_CONTROL_FLAG_RESPOND, + 0, + 0, + n, + buf) + b = b[n:] + + def _recv(self): + '''read some bytes into self.buf''' + from . import mavutil + start_time = time.time() + while time.time() < start_time + self.timeout: + m = self.mav.recv_match(condition='SERIAL_CONTROL.count!=0', + type='SERIAL_CONTROL', blocking=False, timeout=0) + if m is not None and m.count != 0: + break + self.mav.mav.serial_control_send(self.port, + mavutil.mavlink.SERIAL_CONTROL_FLAG_EXCLUSIVE | + mavutil.mavlink.SERIAL_CONTROL_FLAG_RESPOND, + 0, + 0, + 0, [0]*70) + m = self.mav.recv_match(condition='SERIAL_CONTROL.count!=0', + type='SERIAL_CONTROL', blocking=True, timeout=0.01) + if m is not None and m.count != 0: + break + if m is not None: + if self._debug > 2: + print(m) + data = m.data[:m.count] + self.buf.extend(data) + + def read(self, n): + '''read some bytes''' + if len(self.buf) == 0: + self._recv() + if len(self.buf) > 0: + if n > len(self.buf): + n = len(self.buf) + ret = self.buf[:n] + self.buf = self.buf[n:] + return ret + return bytearray() + + def flushInput(self): + '''flush any pending input''' + self.buf = bytearray() + saved_timeout = self.timeout + self.timeout = 0.5 + self._recv() + self.timeout = saved_timeout + self.buf = bytearray() + self.debug("flushInput") + + def setBaudrate(self, baudrate): + '''set baudrate''' + from . import mavutil + if self.baudrate == baudrate: + return + self.baudrate = baudrate + self.mav.mav.serial_control_send(self.port, + mavutil.mavlink.SERIAL_CONTROL_FLAG_EXCLUSIVE, + 0, + self.baudrate, + 0, [0]*70) + self.flushInput() + self.debug("Changed baudrate %u" % self.baudrate) + +def decode_bitmask(messagetype, field, value): + try: + _class = eval("mavlink.MAVLink_%s_message" % messagetype.lower()) + except AttributeError as e: + raise AttributeError("No such message type") + + try: + display = _class.fielddisplays_by_name[field] + except KeyError as e: + raise AttributeError("Not a bitmask field (none specified)") + + if display != "bitmask": + raise ValueError("Not a bitmask field") + + try: + enum_name = _class.fieldenums_by_name[field] + except KeyError as e: + raise AttributeError("No enum found for bitmask field") + + try: + enum = mavlink.enums[enum_name] + except KeyError as e: + raise AttributeError("Did not find specified enumeration (%s)" % enum_name) + + class EnumBitInfo(object): + def __init__(self, offset, value, name): + self.offset = offset + self.value = value + self.name = name + + ret = [] + for i in range(0, 64): + bit_value = (1 << i) + try: + enum_entry = enum[bit_value] + enum_entry_name = enum_entry.name + except KeyError as e: + enum_entry_name = None + if value == 0: + continue + if value & bit_value: + ret.append( EnumBitInfo(i, True, enum_entry_name) ) + value = value & ~bit_value + else: + ret.append( EnumBitInfo(i, False, enum_entry_name) ) + return ret + +'''lookup table to map from a raw unit into a divisor and output unit''' +dump_message_unit_decoder = { + "d%": [10.0, "%"], + "c%": [100.0, "%"], + "cA": [100.0, "A"], + "cdegC": [100.0, "degC"], + "cdeg": [100.0, "deg"], + "degE5": [100000.0, "deg"], + "degE7": [10000000.0, "deg"], + "mG": [1000.0, "G"], + "mrad/s": [1000.0, "rad/s"], + "mV": [1000.0, "V"] +} + +def dump_message_verbose(f, m): + '''write an excruciatingly detailed dump of message m to file descriptor f''' + try: + # __getattr__ may be overridden on m, thus this try/except + timestamp = m._timestamp + except AttributeError as e: + timestamp = "" + if timestamp != "": + timestamp = "%s.%02u: " % ( + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)), + int(timestamp*100.0)%100) + f.write("%s%s (id=%u) (link=%s) (signed=%s) (seq=%u) (src=%u/%u)\n" % (timestamp, m.get_type(), m.get_msgId(), str(m.get_link_id()), str(m.get_signed()), m.get_seq(), m.get_srcSystem(), m.get_srcComponent())) + for fieldname in m.get_fieldnames(): + + # format in those most boring way possible: + value = m.format_attr(fieldname) + + # try to add units: + try: + units = m.fieldunits_by_name[fieldname] + + # perform simple unit conversions: + if units in dump_message_unit_decoder: + divisor = dump_message_unit_decoder[units][0] + units = dump_message_unit_decoder[units][1] + if type(value) == list: + value = [x/divisor for x in value] + else: + value = value / divisor + + # append degrees to fields in radians: + if units == "rad": + value = "%s%s (%sdeg)" % (value, units, math.degrees(value)) + elif units == "rad/s": + value = "%s%s (%sdeg/s)" % (value, units, math.degrees(value)) + elif units == "rad/s/s": + value = "%s%s (%sdeg/s/s)" % (value, units, math.degrees(value)) + + # append local time if us represents unix time: + elif units == "us": + if value > (1<<50): + local_time = time.localtime(int(value/1000000)) + time_str = time.strftime("%Y-%m-%d %H:%M:%S", local_time) + tm_zone = local_time.tm_zone + value = "%s%s (%s.%06d %s)" % (value, units, time_str, value%1000000, tm_zone) + elif value > 1000000: + value = "%s%s (%ss)" % (value, units, value / 1000000.0) + else: + value = "%s%s" % (value, units) + + # by default, just append the unit: + else: + value = "%s%s" % (value, units) + except AttributeError as e: + # e.g. BAD_DATA + pass + except KeyError as e: + pass + + # format any bitmask enumerations: + try: + display = m.fielddisplays_by_name[fieldname] if fieldname in m.fielddisplays_by_name else "" + if display == "bitmask": + # Display bitmasks as hex (dec), regardless of whether there is an enum + f.write(" %s: 0x%x (%d)\n" % (fieldname, value, value)) + enum_name = m.fieldenums_by_name[fieldname] if fieldname in m.fieldenums_by_name else None + if enum_name is not None: + bits = decode_bitmask(m.get_type(), fieldname, value) + for bit in bits: + value = bit.value + name = bit.name + svalue = " " + if not value: + svalue = "!" + if name is None: + name = "[UNKNOWN]" + f.write(" %s %s\n" % (svalue, name)) + continue +# except NameError as e: +# pass + except AttributeError as e: + # e.g. BAD_DATA + pass + except KeyError as e: + pass + + # add any enumeration name: + try: + enum_name = m.fieldenums_by_name[fieldname] + try: + enum_value = mavlink.enums[enum_name][value].name + value = "%s (%s)" % (value, enum_value) + except KeyError as e: + value = "%s (%s)" % (value, "[UNKNOWN]") + except AttributeError as e: + # e.g. BAD_DATA + pass + except KeyError as e: + pass + + f.write(" %s: %s\n" % (fieldname, value)) + + +if __name__ == '__main__': + serial_list = auto_detect_serial(preferred_list=['*FTDI*',"*Arduino_Mega_2560*", "*3D_Robotics*", "*USB_to_UART*", '*PX4*', '*FMU*']) + for port in serial_list: + print("%s" % port) \ No newline at end of file diff --git a/src/unitdev02/unitdev02/serial_udp_ad.py b/src/unitdev02/unitdev02/serial_udp_ad.py new file mode 100644 index 0000000..04d228e --- /dev/null +++ b/src/unitdev02/unitdev02/serial_udp_ad.py @@ -0,0 +1,71 @@ +import asyncio +import serial_asyncio + +SERIAL_PORT = '/dev/ttyUSB0' # 修改為你的 serial port +SERIAL_BAUDRATE = 115200 # 修改為你的 baudrate +UDP_REMOTE_IP = '192.168.1.100' # 修改為目標 IP +UDP_REMOTE_PORT = 5005 # 修改為目標 port +UDP_LOCAL_PORT = 5006 # 本地 UDP 監聽 port + +class SerialToUDP(asyncio.Protocol): + def __init__(self, udp_transport): + self.udp_transport = udp_transport + + def connection_made(self, transport): + self.transport = transport + + def data_received(self, data): + # Serial 收到資料,轉發到 UDP + self.udp_transport.sendto(data, (UDP_REMOTE_IP, UDP_REMOTE_PORT)) + + def write_to_serial(self, data): + self.transport.write(data) + +class UDPToSerial(asyncio.DatagramProtocol): + def __init__(self, serial_proto): + self.serial_proto = serial_proto + + def datagram_received(self, data, addr): + # UDP 收到資料,轉發到 Serial + self.serial_proto.write_to_serial(data) + +async def main(): + loop = asyncio.get_running_loop() + + # 定義協議工廠函數 + def create_empty_protocol(): + return asyncio.DatagramProtocol() + + # 建立 UDP1 傳輸 + udp_transport, _ = await loop.create_datagram_endpoint( + create_empty_protocol, + local_addr=('0.0.0.0', UDP_LOCAL_PORT) + ) + + # 建立 Serial 傳輸 + serial_proto = SerialToUDP(udp_transport) + def get_serial_protocol(): + return serial_proto + + _, serial_transport = await serial_asyncio.create_serial_connection( + loop, get_serial_protocol, SERIAL_PORT, baudrate=SERIAL_BAUDRATE + ) + + # 建立 UDP2 監聽 + udp_proto = UDPToSerial(serial_proto) + def get_udp_protocol(): + return udp_proto + + udp_listen_transport, _ = await loop.create_datagram_endpoint( + get_udp_protocol, + local_addr=('0.0.0.0', UDP_LOCAL_PORT) + ) + + # 保持運行 + try: + await asyncio.Future() + except asyncio.CancelledError: + pass + +if __name__ == '__main__': + asyncio.run(main()) \ No newline at end of file diff --git a/src/unitdev02/unitdev02/sslChech.sh b/src/unitdev02/unitdev02/sslChech.sh deleted file mode 100644 index 9f38904..0000000 --- a/src/unitdev02/unitdev02/sslChech.sh +++ /dev/null @@ -1,27 +0,0 @@ -#!/bin/bash - -# 網站清單 -DOMAINS=("google.com" "smarter.nchu.edu.tw") - -echo "網站 SSL 憑證剩餘天數:" -echo "---------------------------" - -for domain in "${DOMAINS[@]}"; do - end_date=$(echo | openssl s_client -servername "$domain" -connect "$domain:443" 2>/dev/null | - openssl x509 -noout -enddate | cut -d= -f2) - - end_timestamp=$(date -d "$end_date" +%s) - now_timestamp=$(date +%s) - - remaining_days=$(( (end_timestamp - now_timestamp) / 86400 )) - - if [ $remaining_days -lt 0 ]; then - status="已過期 ❌" - elif [ $remaining_days -lt 15 ]; then - status="即將到期 ⚠️" - else - status="正常 ✅" - fi - - printf "%-20s 到期日:%-25s 剩餘天數:%3d 天 %s\n" "$domain" "$end_date" "$remaining_days" "$status" -done diff --git a/src/unitdev02/unitdev02/udp_unix_ad.py b/src/unitdev02/unitdev02/udp_unix_ad.py new file mode 100644 index 0000000..acc9e62 --- /dev/null +++ b/src/unitdev02/unitdev02/udp_unix_ad.py @@ -0,0 +1,40 @@ +import socket +import sys +import select + +# 設定來源 IP 與 Port +SRC_IP = '127.0.0.1' # 監聽所有介面 +SRC_PORT = 16661 # 請自行修改 + +# 建立 UDP 監聽 socket +src_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +src_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +src_sock.bind((SRC_IP, SRC_PORT)) +print(f"Listening for UDP on {SRC_IP}:{SRC_PORT}") + +# 設定目標 Unix socket 路徑 +UNIX_SOCKET_PATH = '/tmp/unix_socket_mavlink.sock' # 請自行修改 + +# 建立 Unix socket 連線 +unix_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) +try: + unix_sock.connect(UNIX_SOCKET_PATH) +except Exception as e: + print(f"Failed to connect to unix socket: {e}") + sys.exit(1) + +while True: + # 使用 select 監聽兩個 socket + readable, _, _ = select.select([src_sock, unix_sock], [], []) + for sock in readable: + if sock is src_sock: + data, addr = src_sock.recvfrom(4096) + if data: + unix_sock.sendall(data) + # print(f"Received UDP data from {addr}: {data}") # debug + # break # debug + elif sock is unix_sock: + data = unix_sock.recv(4096) + if data: + # 回送到最近收到資料的 UDP client + src_sock.sendto(data, addr) \ No newline at end of file