diff --git a/src/unitdev04/coordinator_api.py b/src/unitdev04/coordinator_api.py new file mode 100644 index 0000000..a4c98bf --- /dev/null +++ b/src/unitdev04/coordinator_api.py @@ -0,0 +1,47 @@ +import threading +import time +from digi.xbee.devices import XBeeDevice + +# Configure XBee connection +PORT = "/dev/ttyUSB1" +BAUD_RATE = 57600 + +# Initialize XBee device +device = XBeeDevice(PORT, BAUD_RATE) +device.open() + +# Callback function to handle incoming messages +def data_received_callback(xbee_message): + sender = xbee_message.remote_device.get_64bit_addr() + try: + data = xbee_message.data.decode("utf-8") # Attempt UTF-8 decoding + except UnicodeDecodeError: + data = xbee_message.data.hex() # Fallback to HEX if decoding fails + print(f"\nReceived from {sender}: {data}\nEnter message: ", end="") + +# Register callback for incoming data +device.add_data_received_callback(data_received_callback) + +# Function to send broadcast messages (runs in a separate thread) +def send_messages(): + while True: + user_input = input("\nEnter message: ") # User input + status = device.send_data_broadcast(user_input) # Broadcast message + if status: + print("Broadcast message sent successfully!") + else: + print("Failed to send broadcast message.") + +# Start the message sending thread +send_thread = threading.Thread(target=send_messages, daemon=True) +send_thread.start() + +print("Broadcast chat mode activated. Type a message and press Enter to send.\n") + +# Keep the program running +try: + while True: + time.sleep(0.1) # Reduce CPU usage and ensure stability +except KeyboardInterrupt: + print("\nProgram terminated.") + device.close() diff --git a/src/unitdev04/endpoint_at.py b/src/unitdev04/endpoint_at.py new file mode 100644 index 0000000..b6e0bfb --- /dev/null +++ b/src/unitdev04/endpoint_at.py @@ -0,0 +1,41 @@ +import serial +import threading + +PORT = "COM15" # AT Mode XBee COM Port +BAUD_RATE = 57600 + +# Initialize serial connection +ser = serial.Serial(PORT, BAUD_RATE, timeout=1) + +# Function to receive messages from the Coordinator +def receive_data(): + while True: + data = ser.readline().decode("utf-8", errors="ignore").strip() # Read incoming data + if data: + print(f"\nReceived from Coordinator: {data}") + print("Enter message: ", end="", flush=True) # Keep input prompt intact + +# Function to send messages to the Coordinator +def send_data(): + while True: + user_input = input("\nEnter message: ") # User input + ser.write(user_input.encode() + b'\r') # Send message in AT Mode + print("Message sent.") + +# Start the receiving thread +receive_thread = threading.Thread(target=receive_data, daemon=True) +receive_thread.start() + +# Start the sending thread +send_thread = threading.Thread(target=send_data, daemon=True) +send_thread.start() + +print("AT Mode XBee communication started. Type a message and press Enter to send.\n") + +# Keep the program running +try: + while True: + pass +except KeyboardInterrupt: + print("\nProgram terminated.") + ser.close()