Core Modules

This module contains the RestAPI class for communicating with the REST API.

class pymirokai.core.rest_api.RestAPI(api_version='v1', retry_interval=5, uds_path='/tmp/rest_api.sock', use_uds=False)

Bases: object

Class to handle REST API communication, supporting both Unix Domain Socket (UDS) and TCP/IP.

Parameters:
  • api_version (str)

  • retry_interval (int)

  • uds_path (str)

  • use_uds (bool)

async authenticate()

Authenticate with the REST API and store the JWT token.

Performs an authorization request and extracts the client ID, access level, and token. On success, calls set_jwt_token.

Return type:

None

async close()

Close the REST API connection.

Return type:

None

async connect()

Check the connection status of the REST API and update the status.

Return type:

None

async enable_skill_file(skill_name, enable)

Enable or disable a skill on the robot.

Parameters:
  • skill_name (str) – Name of the skill.

  • enable (bool) – Whether to enable or disable the skill.

Return type:

dict[str, Any]

Returns:

dict – The response from the API.

async get_data_and_update_token_and_client_id(response)

From a response from the API, recursively parse JSON, update client_id and JWT token.

Return type:

dict[str, Any]

async get_services(system=None, service=None)

Retrieve available services or logs for a specific system/service.

Parameters:
  • system (Optional[str]) – Optional system name (“head” or “base”).

  • service (Optional[str]) – Optional service name to retrieve logs for.

Return type:

dict[str, Any]

Returns:

Dictionary describing either available services or logs.

async list_skill_files()

Retrieve the list of uploaded skills.

Return type:

dict[str, Any]

Returns:

dict – The list of skills from the API.

async remove_skill_file(skill_name)

Remove a skill from the robot.

Parameters:

skill_name (str) – Name of the skill to remove.

Return type:

dict[str, Any]

Returns:

dict – The response from the API.

async send_command(endpoint, model, jwt_token)

Send a command to the REST API.

Parameters:
  • endpoint (str) – Target endpoint (e.g., “get”, “skill/enable”).

  • model (BaseModel) – Pydantic model representing the payload.

  • jwt_token (str) – Bearer token for authentication.

Return type:

dict[str, Any]

Returns:

Parsed JSON response from the API.

Raises:

aiohttp.ClientResponseError – If the server returns an error status.

async setup_connection()

Get the JWT token and client_id, and retry until connected.

async start(api_url, api_key, set_jwt_token=None, get_jwt_token=None, use_uds=None, ssl_context=None)

Start and maintain the REST API session.

Initializes an aiohttp client session (TCP or UDS) and continuously attempts to connect until a valid JWT token is obtained.

Parameters:
  • api_url (str) – Base URL of the API (ignored if using UDS).

  • api_key (str) – API key for authentication.

  • set_jwt_token (Optional[callable]) – Callback to store the received JWT token.

  • get_jwt_token (Optional[callable]) – Callback to retrieve the current JWT token.

  • use_uds (Optional[bool]) – Whether to use a Unix Domain Socket.

  • ssl_context (Optional[SSLContext]) – Optional SSL context for HTTPS connections.

Return type:

None

async upload_skill_file(skill_path)

Upload a skill to the robot.

Parameters:

skill_path (str) – Path to the Python skill to upload.

Return type:

dict[str, Any]

Returns:

dict – The response from the API.

This module contains the WebSocketAPI class for communicating with the WebSocket.

class pymirokai.core.websocket_api.WebSocketAPI(retry_interval=5)

Bases: object

Class to handle WebSocket communication.

Parameters:

retry_interval (int)

async close()

Close the WebSocket connection.

Return type:

None

async connect()

Maintain a persistent WebSocket connection.

Continuously attempts to connect and listen for messages. Automatically re-subscribes to previous topics after reconnect.

Return type:

None

get_data()

Get the latest data received from the WebSocket.

Return type:

dict[str, Any]

Returns:

Dict[str, Any] – The latest data.

async publish(topic, data)

Publish a JSON message to a user-specific topic.

Automatically prefixes the topic with the authenticated client ID.

Parameters:
  • topic (str) – Subtopic name (e.g., “alerts” or “status”).

  • data (str) – JSON-encoded message body.

Raises:
  • websockets.ConnectionClosedError – If the connection is closed.

  • json.JSONDecodeError – If the input data is not valid JSON.

Return type:

None

register_callback(topic, callback)

Register a callback to be invoked when a topic message arrives.

Supports both synchronous and asynchronous functions.

Parameters:
  • topic (str) – Topic name to monitor.

  • callback (Callable[[dict[str, Any]], None]) – Function or coroutine called with the message dict.

Return type:

None

async start(ws_url, get_jwt_token, ssl_context=None)

Start the WebSocket connection.

Parameters:
  • ws_url (str) – The url of the websocket

  • get_jwt_token (callable) – Callback to get the JWT token

  • ssl_context (ssl.SSLContext | None) – SSL context to use to connect (if wss).

Return type:

None

async subscribe(topic)

Subscribe to a topic.

Parameters:

topic (str) – The topic to subscribe to.

Return type:

None

async unsubscribe(topic)

Unsubscribe from a topic.

Parameters:

topic (str) – The topic to unsubscribe from.

Return type:

None

Video API interface for managing robot camera streams.

Provides methods to start, stop, and handle video streams over the network.

class pymirokai.core.video_api.VideoAPI(display=False, timeout=5000, on_frame=None, reconnect_delay=5.0)

Bases: object

Handle a single OpenCV-based video stream with optional per-frame callback and auto-reconnect.

Parameters:
  • display (bool)

  • timeout (int)

  • on_frame (Callable[[cv2.Mat, str], None] | None)

  • reconnect_delay (float)

close()

Stop the stream capture and release resources.

Return type:

None

connect(stream_url)

Connect to the video stream using OpenCV.

Parameters:

stream_url (str) – The video stream URL.

Return type:

bool

Returns:

bool – True if successfully connected, False otherwise.

get_current_frame()

Return the most recent captured frame.

Return type:

Optional[cv2.Mat]

Returns:

Optional[cv2.Mat] – The latest frame, or None if unavailable.

set_display(value)

Set whether to display the stream.

Parameters:

value (bool) – True to display, False to hide.

Return type:

None

start(stream_url)

Start the capture loop in a background thread.

Parameters:

stream_url (str) – The URL of the stream to connect to.

Return type:

None

toggle_display()

Toggle whether to show the OpenCV display window.

Return type:

None

class pymirokai.core.video_api.VideoStreamManager(stream_base_url=None)

Bases: object

Manage multiple concurrent video streams with OpenCV.

Parameters:

stream_base_url (str | None)

add_stream(stream_name, stream_url, display=False, on_frame=None)

Add and start a new managed video stream.

Creates a VideoAPI instance for the provided stream, starts capturing frames in a background thread, and optionally displays them in real time.

Parameters:
  • stream_name (str) – Unique name for the stream.

  • stream_url (str) – Relative or absolute URL of the video stream.

  • display (bool) – Whether to show the stream in an OpenCV window.

  • on_frame (Optional[Callable[[cv2.Mat, str], None]]) – Optional per-frame callback invoked as (frame, url).

Return type:

None

close_all_streams()

Stop and close all active video streams.

Ensures all capture threads and display windows are properly terminated and resources are released.

Return type:

None

remove_stream(stream_name)

Stop and remove a specific stream.

Return type:

None

Parameters:

stream_name (str)

set_display(stream_name, value)

Enable or disable display for a given stream.

Return type:

None

Parameters:
  • stream_name (str)

  • value (bool)

start_display_thread()

Start a dedicated display thread to render all active streams.

Return type:

None

Core manager responsible for registering, executing, and monitoring robot skills.

This module exposes the SkillsManager class, which provides the asynchronous interface for launching and managing mission-based skill execution.

class pymirokai.core.skills_manager.SkillsManager(upload_dir, robot)

Bases: object

Manages custom (foreground) skills that can be called once and produce results.

Parameters:
  • upload_dir (Path)

  • robot (Robot)

async call_skill(function_name, jwt_token, *args, **kwargs)

Execute a registered skill asynchronously.

Validates parameters, injects a temporary Robot instance if required, and runs the corresponding function (sync or async).

Parameters:
  • function_name (str) – Name of the skill to execute.

  • jwt_token (str) – JWT token to authenticate nested robot calls.

  • *args – Positional arguments passed to the skill.

  • **kwargs – Keyword arguments passed to the skill.

Return type:

dict[str, Any]

Returns:

Dictionary containing – - “value”: Result returned by the skill (or None on failure). - “status”: One of "COMPLETED", "CANCELLED", or "ERROR".

async cancel_auto_start_skill(function_name)

Cancel a running auto-start skill by name and await its completion.

Cancel, even if its task was created on a different event loop.

Looks up the task in self.running_auto_start_skills. If not found, the call is a no-op (a warning is logged). If found:

  • If the task is attached to the current loop, cancel and await it locally.

  • Otherwise, schedule cancellation on the owning loop and await completion

via _cancel_task_cross_loop.

Parameters:

function_name (str) – The auto-start skill name (dict key in running_auto_start_skills).

Return type:

None

Returns:

None

async cancel_mission(function_name)

Cancel a running mission by name and await its completion.

Cancel, even if its task was created on a different event loop.

Looks up the mission task in self.running_missions. If not found, the call is a no-op (a warning is logged). If found:

  • If the task is attached to the current loop, cancel and await it locally.

  • Otherwise, schedule cancellation on the owning loop and await completion

via _cancel_task_cross_loop.

Parameters:

function_name (str) – The mission/skill name (dict key in running_missions).

Return type:

None

Returns:

None

get_skill_info(file_path)

Extract decorated skills and auto_start_skills from a single Python file.

Parameters:

file_path (Path) – File to inspect.

Return type:

tuple[dict[str, Any], dict[str, Any]]

Returns:

Tuple[dict[str, Any], dict[str, Any]] – Found skills and auto_start_skills.

get_skills_list()

Load skills and auto-start skills from all .py files in the upload directory.

Return type:

dict[str, dict[str, Any]]

Returns:

dict[str, dict[str, Any]] – Mapping of skill name to metadata.

list_running_auto_start_skills()

Return a list of running auto-start skills.

Return type:

list[str]

Returns:

list[str] – auto-start skill names.

list_running_missions()

Return a list of running missions.

Return type:

list[str]

Returns:

list[str] – Skill names.

async monitor_loop(interval=1.0)

Continuously monitor and reload skills on disk.

Periodically rescans the upload directory for skill files and updates the registry. Runs indefinitely.

Parameters:

interval (float) – Delay in seconds between update checks.

Return type:

None

serialize_skills()

Serialize skill metadata for external use.

Return type:

dict[str, Any]

Returns:

dict[str, Any] – Skill details without actual function references.

async update_skills()

Refresh the list of available skills.

Reloads all skill modules and cancels any missions or auto-start skills that have been removed or disabled.

Return type:

None