Core Modules
This module contains the RestAPI class for communicating with the REST API.
- class pymirokai.core.rest_api.RestAPI(api_version='v1', retry_interval=5, uds_path='/tmp/rest_api.sock', use_uds=False)
Bases:
objectClass to handle REST API communication, supporting both Unix Domain Socket (UDS) and TCP/IP.
- Parameters:
api_version (str)
retry_interval (int)
uds_path (str)
use_uds (bool)
- async authenticate()
Authenticate with the REST API and store the JWT token.
Performs an authorization request and extracts the client ID, access level, and token. On success, calls
set_jwt_token.- Return type:
None
- async close()
Close the REST API connection.
- Return type:
None
- async connect()
Check the connection status of the REST API and update the status.
- Return type:
None
- async enable_skill_file(skill_name, enable)
Enable or disable a skill on the robot.
- Parameters:
skill_name (str) – Name of the skill.
enable (bool) – Whether to enable or disable the skill.
- Return type:
dict[str,Any]- Returns:
dict – The response from the API.
- async get_data_and_update_token_and_client_id(response)
From a response from the API, recursively parse JSON, update client_id and JWT token.
- Return type:
dict[str,Any]
- async get_services(system=None, service=None)
Retrieve available services or logs for a specific system/service.
- Parameters:
system (
Optional[str]) – Optional system name (“head” or “base”).service (
Optional[str]) – Optional service name to retrieve logs for.
- Return type:
dict[str,Any]- Returns:
Dictionary describing either available services or logs.
- async list_skill_files()
Retrieve the list of uploaded skills.
- Return type:
dict[str,Any]- Returns:
dict – The list of skills from the API.
- async remove_skill_file(skill_name)
Remove a skill from the robot.
- Parameters:
skill_name (str) – Name of the skill to remove.
- Return type:
dict[str,Any]- Returns:
dict – The response from the API.
- async send_command(endpoint, model, jwt_token)
Send a command to the REST API.
- Parameters:
endpoint (
str) – Target endpoint (e.g., “get”, “skill/enable”).model (
BaseModel) – Pydantic model representing the payload.jwt_token (
str) – Bearer token for authentication.
- Return type:
dict[str,Any]- Returns:
Parsed JSON response from the API.
- Raises:
aiohttp.ClientResponseError – If the server returns an error status.
- async setup_connection()
Get the JWT token and client_id, and retry until connected.
- async start(api_url, api_key, set_jwt_token=None, get_jwt_token=None, use_uds=None, ssl_context=None)
Start and maintain the REST API session.
Initializes an aiohttp client session (TCP or UDS) and continuously attempts to connect until a valid JWT token is obtained.
- Parameters:
api_url (
str) – Base URL of the API (ignored if using UDS).api_key (
str) – API key for authentication.set_jwt_token (
Optional[callable]) – Callback to store the received JWT token.get_jwt_token (
Optional[callable]) – Callback to retrieve the current JWT token.use_uds (
Optional[bool]) – Whether to use a Unix Domain Socket.ssl_context (
Optional[SSLContext]) – Optional SSL context for HTTPS connections.
- Return type:
None
- async upload_skill_file(skill_path)
Upload a skill to the robot.
- Parameters:
skill_path (str) – Path to the Python skill to upload.
- Return type:
dict[str,Any]- Returns:
dict – The response from the API.
This module contains the WebSocketAPI class for communicating with the WebSocket.
- class pymirokai.core.websocket_api.WebSocketAPI(retry_interval=5)
Bases:
objectClass to handle WebSocket communication.
- Parameters:
retry_interval (int)
- async close()
Close the WebSocket connection.
- Return type:
None
- async connect()
Maintain a persistent WebSocket connection.
Continuously attempts to connect and listen for messages. Automatically re-subscribes to previous topics after reconnect.
- Return type:
None
- get_data()
Get the latest data received from the WebSocket.
- Return type:
dict[str,Any]- Returns:
Dict[str, Any] – The latest data.
- async publish(topic, data)
Publish a JSON message to a user-specific topic.
Automatically prefixes the topic with the authenticated client ID.
- Parameters:
topic (
str) – Subtopic name (e.g., “alerts” or “status”).data (
str) – JSON-encoded message body.
- Raises:
websockets.ConnectionClosedError – If the connection is closed.
json.JSONDecodeError – If the input data is not valid JSON.
- Return type:
None
- register_callback(topic, callback)
Register a callback to be invoked when a topic message arrives.
Supports both synchronous and asynchronous functions.
- Parameters:
topic (
str) – Topic name to monitor.callback (
Callable[[dict[str,Any]],None]) – Function or coroutine called with the message dict.
- Return type:
None
- async start(ws_url, get_jwt_token, ssl_context=None)
Start the WebSocket connection.
- Parameters:
ws_url (str) – The url of the websocket
get_jwt_token (callable) – Callback to get the JWT token
ssl_context (ssl.SSLContext | None) – SSL context to use to connect (if wss).
- Return type:
None
- async subscribe(topic)
Subscribe to a topic.
- Parameters:
topic (str) – The topic to subscribe to.
- Return type:
None
- async unsubscribe(topic)
Unsubscribe from a topic.
- Parameters:
topic (str) – The topic to unsubscribe from.
- Return type:
None
Video API interface for managing robot camera streams.
Provides methods to start, stop, and handle video streams over the network.
- class pymirokai.core.video_api.VideoAPI(display=False, timeout=5000, on_frame=None, reconnect_delay=5.0)
Bases:
objectHandle a single OpenCV-based video stream with optional per-frame callback and auto-reconnect.
- Parameters:
display (bool)
timeout (int)
on_frame (Callable[[cv2.Mat, str], None] | None)
reconnect_delay (float)
- close()
Stop the stream capture and release resources.
- Return type:
None
- connect(stream_url)
Connect to the video stream using OpenCV.
- Parameters:
stream_url (str) – The video stream URL.
- Return type:
bool- Returns:
bool – True if successfully connected, False otherwise.
- get_current_frame()
Return the most recent captured frame.
- Return type:
Optional[cv2.Mat]- Returns:
Optional[cv2.Mat] – The latest frame, or None if unavailable.
- set_display(value)
Set whether to display the stream.
- Parameters:
value (bool) – True to display, False to hide.
- Return type:
None
- start(stream_url)
Start the capture loop in a background thread.
- Parameters:
stream_url (str) – The URL of the stream to connect to.
- Return type:
None
- toggle_display()
Toggle whether to show the OpenCV display window.
- Return type:
None
- class pymirokai.core.video_api.VideoStreamManager(stream_base_url=None)
Bases:
objectManage multiple concurrent video streams with OpenCV.
- Parameters:
stream_base_url (str | None)
- add_stream(stream_name, stream_url, display=False, on_frame=None)
Add and start a new managed video stream.
Creates a
VideoAPIinstance for the provided stream, starts capturing frames in a background thread, and optionally displays them in real time.- Parameters:
stream_name (
str) – Unique name for the stream.stream_url (
str) – Relative or absolute URL of the video stream.display (
bool) – Whether to show the stream in an OpenCV window.on_frame (
Optional[Callable[[cv2.Mat,str],None]]) – Optional per-frame callback invoked as(frame, url).
- Return type:
None
- close_all_streams()
Stop and close all active video streams.
Ensures all capture threads and display windows are properly terminated and resources are released.
- Return type:
None
- remove_stream(stream_name)
Stop and remove a specific stream.
- Return type:
None- Parameters:
stream_name (str)
- set_display(stream_name, value)
Enable or disable display for a given stream.
- Return type:
None- Parameters:
stream_name (str)
value (bool)
- start_display_thread()
Start a dedicated display thread to render all active streams.
- Return type:
None
Core manager responsible for registering, executing, and monitoring robot skills.
This module exposes the SkillsManager class, which provides the asynchronous interface for launching and managing mission-based skill execution.
- class pymirokai.core.skills_manager.SkillsManager(upload_dir, robot)
Bases:
objectManages custom (foreground) skills that can be called once and produce results.
- Parameters:
upload_dir (Path)
robot (Robot)
- async call_skill(function_name, jwt_token, *args, **kwargs)
Execute a registered skill asynchronously.
Validates parameters, injects a temporary
Robotinstance if required, and runs the corresponding function (sync or async).- Parameters:
function_name (
str) – Name of the skill to execute.jwt_token (
str) – JWT token to authenticate nested robot calls.*args – Positional arguments passed to the skill.
**kwargs – Keyword arguments passed to the skill.
- Return type:
dict[str,Any]- Returns:
Dictionary containing – - “value”: Result returned by the skill (or
Noneon failure). - “status”: One of"COMPLETED","CANCELLED", or"ERROR".
- async cancel_auto_start_skill(function_name)
Cancel a running auto-start skill by name and await its completion.
Cancel, even if its task was created on a different event loop.
Looks up the task in self.running_auto_start_skills. If not found, the call is a no-op (a warning is logged). If found:
If the task is attached to the current loop, cancel and await it locally.
Otherwise, schedule cancellation on the owning loop and await completion
via _cancel_task_cross_loop.
- Parameters:
function_name (
str) – The auto-start skill name (dict key in running_auto_start_skills).- Return type:
None- Returns:
None
- async cancel_mission(function_name)
Cancel a running mission by name and await its completion.
Cancel, even if its task was created on a different event loop.
Looks up the mission task in self.running_missions. If not found, the call is a no-op (a warning is logged). If found:
If the task is attached to the current loop, cancel and await it locally.
Otherwise, schedule cancellation on the owning loop and await completion
via _cancel_task_cross_loop.
- Parameters:
function_name (
str) – The mission/skill name (dict key in running_missions).- Return type:
None- Returns:
None
- get_skill_info(file_path)
Extract decorated skills and auto_start_skills from a single Python file.
- Parameters:
file_path (Path) – File to inspect.
- Return type:
tuple[dict[str,Any],dict[str,Any]]- Returns:
Tuple[dict[str, Any], dict[str, Any]] – Found skills and auto_start_skills.
- get_skills_list()
Load skills and auto-start skills from all .py files in the upload directory.
- Return type:
dict[str,dict[str,Any]]- Returns:
dict[str, dict[str, Any]] – Mapping of skill name to metadata.
- list_running_auto_start_skills()
Return a list of running auto-start skills.
- Return type:
list[str]- Returns:
list[str] – auto-start skill names.
- list_running_missions()
Return a list of running missions.
- Return type:
list[str]- Returns:
list[str] – Skill names.
- async monitor_loop(interval=1.0)
Continuously monitor and reload skills on disk.
Periodically rescans the upload directory for skill files and updates the registry. Runs indefinitely.
- Parameters:
interval (
float) – Delay in seconds between update checks.- Return type:
None
- serialize_skills()
Serialize skill metadata for external use.
- Return type:
dict[str,Any]- Returns:
dict[str, Any] – Skill details without actual function references.
- async update_skills()
Refresh the list of available skills.
Reloads all skill modules and cancels any missions or auto-start skills that have been removed or disabled.
- Return type:
None