Python examples:
Here you will find some python examples to display data streamed with websocket. Both asynchronous and synchronous examples are provided.
Asynchronous
The following script is showing how to use websockets with python.
Subscription: The scripts subscribe to navigation_state, and battery_voltage topics after establishing the WebSocket connection.
*Publishing Data: The scripts demonstrate publishing data to a topic named some_topic.
Unsubscription: The scripts unsubscribe from the previously subscribed topics.
import asyncio
import websockets
import json
from typing import Dict, Any
async def listen() -> None:
"""
Connect to the WebSocket server and continuously listen for messages.
Prints the name, type, and data from each received message.
"""
uri: str = "ws://<ROBOT_IP>:8001"
async with websockets.connect(uri) as websocket:
# Subscribe to topics
await websocket.send(json.dumps({"action": "subscribe", "topic": "navigation_state"}))
await websocket.send(json.dumps({"action": "subscribe", "topic": "battery_voltage"}))
# Example of publishing data to a topic
await websocket.send(json.dumps({
"action": "publish",
"topic": "some_topic",
"data": {"example_key": "example_value"}
}))
try:
await asyncio.wait_for(receive_messages(websocket), timeout=5.0)
except asyncio.TimeoutError:
print("Finished listening for 5 seconds.")
# Unsubscribe from topics
await websocket.send(json.dumps({"action": "unsubscribe", "topic": "navigation_state"}))
await websocket.send(json.dumps({"action": "unsubscribe", "topic": "battery_voltage"}))
async def receive_messages(websocket) -> None:
async for message in websocket:
data: Dict[str, Any] = json.loads(message)
print(f"[Name]: {data['name']}\n[Type]: {data['type']}\n[Data]: {data['data']}\n")
if __name__ == "__main__":
asyncio.run(listen())
Synchronous
The following script is showing how to use websockets with python.
Subscription: The scripts subscribe to navigation_state, and battery_voltage topics after establishing the WebSocket connection.
*Publishing Data: The scripts demonstrate publishing data to a topic named some_topic.
Unsubscription: The scripts unsubscribe from the previously subscribed topics.
Note: You will first need to install
websocketslibrary with the command:
import json
import websockets
import threading
import time
from typing import Dict, Any
def on_message(ws: websocket.WebSocketApp, message: str) -> None:
"""
Callback function that processes received messages.
Args:
ws (websocket.WebSocketApp): The WebSocketApp instance.
message (str): The received message as a JSON string.
"""
data: Dict[str, Any] = json.loads(message)
print(f"[Name]: {data['name']}\n[Type]: {data['type']}\n[Data]: {data['data']}\n")
def on_error(ws: websocket.WebSocketApp, error: str) -> None:
"""
Callback function that handles errors.
Args:
ws (websocket.WebSocketApp): The WebSocketApp instance.
error (str): The error message.
"""
print(f"Error: {error}")
def on_close(ws: websocket.WebSocketApp, close_status_code: int, close_msg: str) -> None:
"""
Callback function that handles the WebSocket closing.
Args:
ws (websocket.WebSocketApp): The WebSocketApp instance.
close_status_code (int): The status code for the WebSocket closing.
close_msg (str): The closing message.
"""
print("Connection closed")
def on_open(ws: websocket.WebSocketApp) -> None:
"""
Callback function that handles the WebSocket opening.
Args:
ws (websocket.WebSocketApp): The WebSocketApp instance.
"""
print("Connection opened")
# Subscribe to topics
ws.send(json.dumps({"action": "subscribe", "topic": "navigation_state"}))
ws.send(json.dumps({"action": "subscribe", "topic": "battery_voltage"}))
# Example of publishing data to a topic
ws.send(json.dumps({
"action": "publish",
"topic": "some_topic",
"data": {"example_key": "example_value"}
}))
def run_websocket(ws: websocket.WebSocketApp) -> None:
"""
Run the WebSocket and listen for messages for 5 seconds.
Args:
ws (websocket.WebSocketApp): The WebSocketApp instance.
"""
ws.run_forever()
def stop_websocket(ws: websocket.WebSocketApp) -> None:
"""
Close the WebSocket connection and unsubscribe from topics after 5 seconds.
Args:
ws (websocket.WebSocketApp): The WebSocketApp instance.
"""
time.sleep(5)
ws.send(json.dumps({"action": "unsubscribe", "topic": "navigation_state"}))
ws.send(json.dumps({"action": "unsubscribe", "topic": "battery_voltage"}))
ws.close()
print("Finished listening for 5 seconds and unsubscribed from topics.")
if __name__ == "__main__":
uri: str = "ws://<ROBOT_IP>:8001"
ws: websocket.WebSocketApp = websocket.WebSocketApp(
uri,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.on_open = on_open
# Run the WebSocket in a separate thread
websocket_thread = threading.Thread(target=run_websocket, args=(ws,))
websocket_thread.start()
# Stop the WebSocket after 5 seconds
stop_thread = threading.Thread(target=stop_websocket, args=(ws,))
stop_thread.start()
websocket_thread.join()
stop_thread.join()