Python SDK for runqy - write distributed task handlers with simple decorators.
Documentation · Website
pip install runqy-pythonCreate tasks that run on runqy-worker using simple decorators:
from runqy_python import task, run
@task
def process(payload: dict) -> dict:
return {"message": "Hello!", "received": payload}
if __name__ == "__main__":
run()For ML inference tasks, use @load to load models once at startup:
from runqy_python import task, load, run
@load
def setup():
"""Runs once before ready signal. Return value is passed to @task as ctx."""
model = load_heavy_model() # Load weights, etc.
return {"model": model}
@task
def process(payload: dict, ctx: dict) -> dict:
"""Process tasks using the loaded model."""
prediction = ctx["model"].predict(payload["input"])
return {"prediction": prediction}
if __name__ == "__main__":
run()For lightweight tasks that don't need to stay loaded in memory, use run_once():
from runqy_python import task, run_once
@task
def process(payload: dict) -> dict:
return {"result": payload["x"] * 2}
if __name__ == "__main__":
run_once() # Process one task and exit| Function | Behavior | Use case |
|---|---|---|
run() |
Loops forever, handles many tasks | ML inference (expensive load) |
run_once() |
Handles ONE task, exits | Lightweight tasks |
The SDK handles the runqy-worker stdin/stdout JSON protocol:
- Load phase: Calls
@loadfunction (if registered) - Ready signal: Sends
{"status": "ready"}after load completes - Task input: Reads JSON from stdin:
{"task_id": "...", "payload": {...}} - Response: Writes JSON to stdout:
{"task_id": "...", "result": {...}, "error": null, "retry": false}
The SDK also includes a client for enqueuing tasks to a runqy server:
from runqy_python import RunqyClient
client = RunqyClient("http://localhost:3000", api_key="your-api-key")
# Enqueue a task
task = client.enqueue("inference.default", {"input": "hello"})
print(f"Task ID: {task.task_id}")
# Check result
result = client.get_task(task.task_id)
print(f"State: {result.state}, Result: {result.result}")Or use the convenience function:
from runqy_python import enqueue
task = enqueue(
"inference.default",
{"input": "hello"},
server_url="http://localhost:3000",
api_key="your-api-key"
)RunqyClient(server_url, api_key, timeout=30)
server_url: Base URL of the runqy serverapi_key: API key for authenticationtimeout: Default request timeout in seconds
client.enqueue(queue, payload, timeout=300)
queue: Queue name (e.g.,"inference.default")payload: Task payload as a dicttimeout: Task execution timeout in seconds- Returns:
TaskInfowithtask_id,queue,state
client.get_task(task_id)
task_id: Task ID from enqueue- Returns:
TaskInfowithtask_id,queue,state,result,error
RunqyError: Base exception for all client errorsAuthenticationError: Invalid or missing API keyTaskNotFoundError: Task ID doesn't exist
# Install in editable mode
pip install -e .
# Test task execution
echo '{"task_id":"t1","payload":{"foo":"bar"}}' | python your_model.py
# Test client import
python -c "from runqy_python import RunqyClient; print('OK')"