轉二進位封包

lunu
lenting89 1 year ago
parent eecdab8985
commit ff5c7718be

@ -0,0 +1,47 @@
import threading
import time
from digi.xbee.devices import XBeeDevice
# Configure XBee connection
PORT = "/dev/ttyUSB1"
BAUD_RATE = 57600
# Initialize XBee device
device = XBeeDevice(PORT, BAUD_RATE)
device.open()
# Callback function to handle incoming messages
def data_received_callback(xbee_message):
sender = xbee_message.remote_device.get_64bit_addr()
try:
data = xbee_message.data.decode("utf-8") # Attempt UTF-8 decoding
except UnicodeDecodeError:
data = xbee_message.data.hex() # Fallback to HEX if decoding fails
print(f"\nReceived from {sender}: {data}\nEnter message: ", end="")
# Register callback for incoming data
device.add_data_received_callback(data_received_callback)
# Function to send broadcast messages (runs in a separate thread)
def send_messages():
while True:
user_input = input("\nEnter message: ") # User input
status = device.send_data_broadcast(user_input) # Broadcast message
if status:
print("Broadcast message sent successfully!")
else:
print("Failed to send broadcast message.")
# Start the message sending thread
send_thread = threading.Thread(target=send_messages, daemon=True)
send_thread.start()
print("Broadcast chat mode activated. Type a message and press Enter to send.\n")
# Keep the program running
try:
while True:
time.sleep(0.1) # Reduce CPU usage and ensure stability
except KeyboardInterrupt:
print("\nProgram terminated.")
device.close()

@ -0,0 +1,41 @@
import serial
import threading
PORT = "COM15" # AT Mode XBee COM Port
BAUD_RATE = 57600
# Initialize serial connection
ser = serial.Serial(PORT, BAUD_RATE, timeout=1)
# Function to receive messages from the Coordinator
def receive_data():
while True:
data = ser.readline().decode("utf-8", errors="ignore").strip() # Read incoming data
if data:
print(f"\nReceived from Coordinator: {data}")
print("Enter message: ", end="", flush=True) # Keep input prompt intact
# Function to send messages to the Coordinator
def send_data():
while True:
user_input = input("\nEnter message: ") # User input
ser.write(user_input.encode() + b'\r') # Send message in AT Mode
print("Message sent.")
# Start the receiving thread
receive_thread = threading.Thread(target=receive_data, daemon=True)
receive_thread.start()
# Start the sending thread
send_thread = threading.Thread(target=send_data, daemon=True)
send_thread.start()
print("AT Mode XBee communication started. Type a message and press Enter to send.\n")
# Keep the program running
try:
while True:
pass
except KeyboardInterrupt:
print("\nProgram terminated.")
ser.close()
Loading…
Cancel
Save