From cf9e63b7657568ef4f1d17f4f309302450c69064 Mon Sep 17 00:00:00 2001 From: Cosmin Paunel Date: Mon, 9 Mar 2026 23:07:37 +0200 Subject: [PATCH 1/3] fix: resumable runtime --- .../uipath_openai_agents/runtime/__init__.py | 2 + .../uipath_openai_agents/runtime/factory.py | 62 +++++- .../uipath_openai_agents/runtime/runtime.py | 10 +- .../uipath_openai_agents/runtime/storage.py | 195 ++++++++++++++++++ .../tests/runtime/test_factory_resumable.py | 60 ++++++ .../tests/runtime/test_runtime_session.py | 85 ++++++++ .../tests/runtime/test_storage.py | 68 ++++++ 7 files changed, 477 insertions(+), 5 deletions(-) create mode 100644 packages/uipath-openai-agents/src/uipath_openai_agents/runtime/storage.py create mode 100644 packages/uipath-openai-agents/tests/runtime/test_factory_resumable.py create mode 100644 packages/uipath-openai-agents/tests/runtime/test_runtime_session.py create mode 100644 packages/uipath-openai-agents/tests/runtime/test_storage.py diff --git a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/__init__.py b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/__init__.py index bdae18f9..ac883cfd 100644 --- a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/__init__.py +++ b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/__init__.py @@ -10,6 +10,7 @@ from .factory import UiPathOpenAIAgentRuntimeFactory from .runtime import UiPathOpenAIAgentRuntime from .schema import get_agent_schema, get_entrypoints_schema +from .storage import SqliteResumableStorage def register_runtime_factory() -> None: @@ -33,6 +34,7 @@ def create_factory( "get_agent_schema", "UiPathOpenAIAgentRuntimeFactory", "UiPathOpenAIAgentRuntime", + "SqliteResumableStorage", "get_agent_context_type", "parse_input_to_context", ] diff --git a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/factory.py b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/factory.py index 226df03d..e6bead15 100644 --- a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/factory.py +++ b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/factory.py @@ -1,11 +1,16 @@ """Factory for creating OpenAI Agents runtimes from openai_agents.json configuration.""" import asyncio +import os from typing import Any -from agents import Agent +from agents import Agent, SQLiteSession from openinference.instrumentation.openai_agents import OpenAIAgentsInstrumentor +from uipath.platform.resume_triggers import ( + UiPathResumeTriggerHandler, +) from uipath.runtime import ( + UiPathResumableRuntime, UiPathRuntimeContext, UiPathRuntimeFactorySettings, UiPathRuntimeProtocol, @@ -20,6 +25,7 @@ UiPathOpenAIAgentsRuntimeError, ) from uipath_openai_agents.runtime.runtime import UiPathOpenAIAgentRuntime +from uipath_openai_agents.runtime.storage import SqliteResumableStorage class UiPathOpenAIAgentRuntimeFactory: @@ -41,6 +47,8 @@ def __init__( self._agent_cache: dict[str, Agent] = {} self._agent_loaders: dict[str, OpenAiAgentLoader] = {} self._agent_lock = asyncio.Lock() + self._storage: SqliteResumableStorage | None = None + self._storage_lock = asyncio.Lock() self._setup_instrumentation() @@ -54,6 +62,38 @@ def _load_config(self) -> OpenAiAgentsConfig: self._config = OpenAiAgentsConfig() return self._config + def _get_connection_string(self) -> str: + """Get the SQLite connection string used for resumable state.""" + if self.context.state_file_path is not None: + return self.context.state_file_path + + if self.context.runtime_dir and self.context.state_file: + path = os.path.join(self.context.runtime_dir, self.context.state_file) + if ( + not self.context.resume + and self.context.job_id is None + and not self.context.keep_state_file + ): + # If not resuming and no job id, delete previous local state. + if os.path.exists(path): + try: + os.remove(path) + except OSError: + pass + os.makedirs(self.context.runtime_dir, exist_ok=True) + return path + + default_path = os.path.join("__uipath", "state.db") + os.makedirs(os.path.dirname(default_path), exist_ok=True) + return default_path + + async def _get_storage(self) -> SqliteResumableStorage: + """Get or create the shared resumable storage instance.""" + async with self._storage_lock: + if self._storage is None: + self._storage = SqliteResumableStorage(self._get_connection_string()) + return self._storage + async def _load_agent(self, entrypoint: str) -> Agent: """ Load an agent for the given entrypoint. @@ -185,7 +225,7 @@ async def get_storage(self) -> UiPathRuntimeStorageProtocol | None: """ Get the shared storage instance. """ - return None + return await self._get_storage() async def get_settings(self) -> UiPathRuntimeFactorySettings | None: """ @@ -213,10 +253,22 @@ async def _create_runtime_instance( Returns: Configured runtime instance """ - return UiPathOpenAIAgentRuntime( + storage = await self._get_storage() + trigger_manager = UiPathResumeTriggerHandler() + session = SQLiteSession(session_id=runtime_id, db_path=storage.db_path) + + base_runtime = UiPathOpenAIAgentRuntime( agent=agent, runtime_id=runtime_id, entrypoint=entrypoint, + session=session, + ) + + return UiPathResumableRuntime( + delegate=base_runtime, + storage=storage, + trigger_manager=trigger_manager, + runtime_id=runtime_id, ) async def new_runtime( @@ -246,5 +298,9 @@ async def dispose(self) -> None: for loader in self._agent_loaders.values(): await loader.cleanup() + if self._storage is not None: + await self._storage.close() + self._storage = None + self._agent_loaders.clear() self._agent_cache.clear() diff --git a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/runtime.py b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/runtime.py index eec54198..491e7cfe 100644 --- a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/runtime.py +++ b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/runtime.py @@ -4,7 +4,7 @@ from typing import Any, AsyncGenerator from uuid import uuid4 -from agents import Agent, Runner +from agents import Agent, Runner, SQLiteSession from pydantic import BaseModel from uipath.core.serialization import serialize_json from uipath.runtime import ( @@ -36,6 +36,7 @@ def __init__( agent: Agent, runtime_id: str | None = None, entrypoint: str | None = None, + session: SQLiteSession | None = None, ): """ Initialize the runtime. @@ -44,10 +45,12 @@ def __init__( agent: The OpenAI Agent to execute runtime_id: Unique identifier for this runtime instance entrypoint: Optional entrypoint name (for schema generation) + session: Optional OpenAI Agents SDK session for persistent memory """ self.agent: Agent = agent self.runtime_id: str = runtime_id or "default" self.entrypoint: str | None = entrypoint + self._session = session # Detect context type from agent's generic parameter self._context_type: type[BaseModel] | None = get_agent_context_type(agent) @@ -142,6 +145,7 @@ async def _run_agent( starting_agent=self.agent, input=agent_input, context=context, + session=self._session, ) yield self._create_success_result(result.final_output) @@ -170,6 +174,7 @@ async def _run_agent_streamed( starting_agent=self.agent, input=agent_input, context=context, + session=self._session, ) # Stream events from the agent @@ -354,4 +359,5 @@ async def get_schema(self) -> UiPathRuntimeSchema: async def dispose(self) -> None: """Cleanup runtime resources.""" - pass + if self._session is not None: + self._session.close() diff --git a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/storage.py b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/storage.py new file mode 100644 index 00000000..0863abd1 --- /dev/null +++ b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/storage.py @@ -0,0 +1,195 @@ +"""OpenAI Agents SDK SQLiteSession adapter for UiPath resumable storage.""" + +import asyncio +import json +import os +from typing import Any, cast + +from agents import SQLiteSession +from pydantic import BaseModel +from uipath.core.serialization import serialize_json +from uipath.core.triggers import UiPathResumeTrigger + + +class SqliteResumableStorage: + """UiPath resumable storage backed by OpenAI Agents SDK SQLiteSession.""" + + _STATE_ITEM_TYPE = "uipath_runtime_state" + + def __init__(self, db_path: str): + self.db_path = db_path + self._lock = asyncio.Lock() + self._sessions: dict[str, SQLiteSession] = {} + + def _state_session_id(self, runtime_id: str) -> str: + return f"__uipath_state__{runtime_id}" + + def _get_state_session(self, runtime_id: str) -> SQLiteSession: + session_id = self._state_session_id(runtime_id) + session = self._sessions.get(session_id) + if session is None: + db_dir = os.path.dirname(self.db_path) + if db_dir: + os.makedirs(db_dir, exist_ok=True) + session = SQLiteSession(session_id=session_id, db_path=self.db_path) + self._sessions[session_id] = session + return session + + async def _load_state(self, runtime_id: str) -> dict[str, Any]: + session = self._get_state_session(runtime_id) + items = await session.get_items(limit=1) + if not items: + return {"triggers": [], "kv": {}} + + item = items[0] + if not isinstance(item, dict): + return {"triggers": [], "kv": {}} + + if item.get("type") != self._STATE_ITEM_TYPE: + return {"triggers": [], "kv": {}} + + state = item.get("state") + if not isinstance(state, dict): + return {"triggers": [], "kv": {}} + if not isinstance(state.get("triggers"), list): + state["triggers"] = [] + if not isinstance(state.get("kv"), dict): + state["kv"] = {} + return state + + async def _save_state(self, runtime_id: str, state: dict[str, Any]) -> None: + session = self._get_state_session(runtime_id) + await session.clear_session() + state_item: dict[str, Any] = { + "type": self._STATE_ITEM_TYPE, + "state": state, + } + await session.add_items( + [ + cast(Any, state_item), + ] + ) + + async def save_triggers( + self, runtime_id: str, triggers: list[UiPathResumeTrigger] + ) -> None: + """Save resume triggers, replacing all existing triggers for this runtime_id.""" + async with self._lock: + state = await self._load_state(runtime_id) + serialized_triggers: list[str] = [] + + for trigger in triggers: + if trigger.interrupt_id is None: + raise ValueError("Trigger interrupt_id cannot be None.") + + trigger_data = trigger.model_dump() + trigger_data["payload"] = trigger.payload + trigger_data["trigger_name"] = trigger.trigger_name + serialized_triggers.append(serialize_json(trigger_data)) + + state["triggers"] = serialized_triggers + await self._save_state(runtime_id, state) + + async def get_triggers(self, runtime_id: str) -> list[UiPathResumeTrigger] | None: + """Get all triggers for runtime_id.""" + async with self._lock: + state = await self._load_state(runtime_id) + + trigger_payloads = state.get("triggers", []) + if not trigger_payloads: + return None + + triggers: list[UiPathResumeTrigger] = [] + for payload in trigger_payloads: + if isinstance(payload, str): + triggers.append(UiPathResumeTrigger.model_validate_json(payload)) + return triggers or None + + async def delete_trigger( + self, runtime_id: str, trigger: UiPathResumeTrigger + ) -> None: + """Delete resume trigger from storage.""" + async with self._lock: + state = await self._load_state(runtime_id) + serialized = state.get("triggers", []) + if not isinstance(serialized, list): + serialized = [] + + kept: list[str] = [] + for payload in serialized: + if not isinstance(payload, str): + continue + existing = UiPathResumeTrigger.model_validate_json(payload) + if existing.interrupt_id != trigger.interrupt_id: + kept.append(payload) + + state["triggers"] = kept + await self._save_state(runtime_id, state) + + async def set_value( + self, + runtime_id: str, + namespace: str, + key: str, + value: Any, + ) -> None: + """Save arbitrary key-value pair to storage.""" + if not ( + isinstance(value, str) + or isinstance(value, dict) + or isinstance(value, BaseModel) + or value is None + ): + raise TypeError("Value must be str, dict, BaseModel or None.") + + async with self._lock: + state = await self._load_state(runtime_id) + kv = state.get("kv") + if not isinstance(kv, dict): + kv = {} + state["kv"] = kv + + namespace_data = kv.get(namespace) + if not isinstance(namespace_data, dict): + namespace_data = {} + kv[namespace] = namespace_data + + namespace_data[key] = self._dump_value(value) + await self._save_state(runtime_id, state) + + async def get_value(self, runtime_id: str, namespace: str, key: str) -> Any: + """Get arbitrary key-value pair from storage.""" + async with self._lock: + state = await self._load_state(runtime_id) + + kv = state.get("kv") + if not isinstance(kv, dict): + return None + namespace_data = kv.get(namespace) + if not isinstance(namespace_data, dict): + return None + + return self._load_value(namespace_data.get(key)) + + async def close(self) -> None: + """Close all OpenAI Agents SDK SQLite sessions.""" + async with self._lock: + for session in self._sessions.values(): + session.close() + self._sessions.clear() + + def _dump_value(self, value: str | dict[str, Any] | BaseModel | None) -> str | None: + if value is None: + return None + if isinstance(value, str): + return "s:" + value + return "j:" + serialize_json(value) + + def _load_value(self, raw: str | None) -> Any: + if raw is None: + return None + if raw.startswith("s:"): + return raw[2:] + if raw.startswith("j:"): + return json.loads(raw[2:]) + return raw diff --git a/packages/uipath-openai-agents/tests/runtime/test_factory_resumable.py b/packages/uipath-openai-agents/tests/runtime/test_factory_resumable.py new file mode 100644 index 00000000..cf0933a2 --- /dev/null +++ b/packages/uipath-openai-agents/tests/runtime/test_factory_resumable.py @@ -0,0 +1,60 @@ +"""Tests for resumable runtime wiring in factory.""" + +import os + +import pytest +from agents import Agent, SQLiteSession +from uipath.runtime import UiPathResumableRuntime, UiPathRuntimeContext + +from uipath_openai_agents.runtime.factory import UiPathOpenAIAgentRuntimeFactory +from uipath_openai_agents.runtime.runtime import UiPathOpenAIAgentRuntime +from uipath_openai_agents.runtime.storage import SqliteResumableStorage + + +@pytest.mark.asyncio +async def test_factory_wraps_runtime_with_resumable_runtime(tmp_path): + context = UiPathRuntimeContext( + runtime_dir=str(tmp_path / "runtime"), + state_file="openai_state.db", + ) + factory = UiPathOpenAIAgentRuntimeFactory(context=context) + + try: + runtime = await factory._create_runtime_instance( + agent=Agent(name="test-agent", instructions="test"), + runtime_id="runtime-1", + entrypoint="agent", + ) + + assert isinstance(runtime, UiPathResumableRuntime) + assert isinstance(runtime.delegate, UiPathOpenAIAgentRuntime) + assert isinstance(runtime.storage, SqliteResumableStorage) + assert isinstance(runtime.delegate._session, SQLiteSession) + assert runtime.delegate._session.session_id == "runtime-1" + + storage = await factory.get_storage() + assert storage is not None + assert runtime.storage is storage + finally: + await factory.dispose() + + +def test_get_connection_string_cleans_state_for_fresh_local_run(tmp_path): + runtime_dir = tmp_path / "runtime" + runtime_dir.mkdir(parents=True, exist_ok=True) + state_path = runtime_dir / "state.db" + state_path.write_text("stale state") + + context = UiPathRuntimeContext( + runtime_dir=str(runtime_dir), + state_file="state.db", + resume=False, + job_id=None, + keep_state_file=False, + ) + factory = UiPathOpenAIAgentRuntimeFactory(context=context) + + resolved = factory._get_connection_string() + + assert resolved == str(state_path) + assert not os.path.exists(state_path) diff --git a/packages/uipath-openai-agents/tests/runtime/test_runtime_session.py b/packages/uipath-openai-agents/tests/runtime/test_runtime_session.py new file mode 100644 index 00000000..28912dd6 --- /dev/null +++ b/packages/uipath-openai-agents/tests/runtime/test_runtime_session.py @@ -0,0 +1,85 @@ +"""Tests for OpenAI session integration in runtime.""" + +from typing import Any + +import pytest +from agents import Agent +from uipath.runtime import UiPathRuntimeResult, UiPathRuntimeStatus + +from uipath_openai_agents.runtime.runtime import UiPathOpenAIAgentRuntime + + +class DummySession: + def __init__(self): + self.closed = False + + def close(self): + self.closed = True + + +@pytest.mark.asyncio +async def test_execute_forwards_session_to_runner(monkeypatch): + captured: dict[str, Any] = {} + session = DummySession() + + class FakeRunResult: + final_output = {"result": "ok"} + + async def fake_run(*args, **kwargs): + captured["session"] = kwargs.get("session") + return FakeRunResult() + + monkeypatch.setattr("uipath_openai_agents.runtime.runtime.Runner.run", fake_run) + + runtime = UiPathOpenAIAgentRuntime( + agent=Agent(name="test-agent", instructions="test"), + runtime_id="runtime-1", + entrypoint="agent", + session=session, # type: ignore[arg-type] + ) + + result = await runtime.execute(input={"messages": "hello"}) + + assert captured["session"] is session + assert result.status == UiPathRuntimeStatus.SUCCESSFUL + assert result.output == {"result": "ok"} + + await runtime.dispose() + assert session.closed is True + + +@pytest.mark.asyncio +async def test_stream_forwards_session_to_runner(monkeypatch): + captured: dict[str, Any] = {} + session = DummySession() + + class FakeStreamingResult: + final_output = {"answer": "done"} + + async def stream_events(self): + if False: + yield None + + def fake_run_streamed(*args, **kwargs): + captured["session"] = kwargs.get("session") + return FakeStreamingResult() + + monkeypatch.setattr( + "uipath_openai_agents.runtime.runtime.Runner.run_streamed", + fake_run_streamed, + ) + + runtime = UiPathOpenAIAgentRuntime( + agent=Agent(name="test-agent", instructions="test"), + runtime_id="runtime-2", + entrypoint="agent", + session=session, # type: ignore[arg-type] + ) + + events = [event async for event in runtime.stream(input={"messages": "hello"})] + + assert captured["session"] is session + assert len(events) == 1 + assert isinstance(events[0], UiPathRuntimeResult) + assert events[0].status == UiPathRuntimeStatus.SUCCESSFUL + assert events[0].output == {"answer": "done"} diff --git a/packages/uipath-openai-agents/tests/runtime/test_storage.py b/packages/uipath-openai-agents/tests/runtime/test_storage.py new file mode 100644 index 00000000..0c199d2e --- /dev/null +++ b/packages/uipath-openai-agents/tests/runtime/test_storage.py @@ -0,0 +1,68 @@ +"""Tests for SqliteResumableStorage.""" + +import pytest +from pydantic import BaseModel +from uipath.core.triggers import ( + UiPathResumeTrigger, + UiPathResumeTriggerName, + UiPathResumeTriggerType, +) + +from uipath_openai_agents.runtime.storage import SqliteResumableStorage + + +class ValueModel(BaseModel): + value: str + + +@pytest.fixture +async def storage(tmp_path): + db_path = tmp_path / "state.db" + instance = SqliteResumableStorage(str(db_path)) + yield instance + await instance.close() + + +@pytest.mark.asyncio +async def test_set_and_get_values(storage: SqliteResumableStorage): + await storage.set_value("runtime-1", "ns", "str_key", "hello") + await storage.set_value("runtime-1", "ns", "dict_key", {"x": 1, "y": True}) + await storage.set_value("runtime-1", "ns", "model_key", ValueModel(value="ok")) + + assert await storage.get_value("runtime-1", "ns", "str_key") == "hello" + assert await storage.get_value("runtime-1", "ns", "dict_key") == {"x": 1, "y": True} + assert await storage.get_value("runtime-1", "ns", "model_key") == {"value": "ok"} + assert await storage.get_value("runtime-1", "ns", "missing") is None + + +@pytest.mark.asyncio +async def test_save_get_and_delete_triggers(storage: SqliteResumableStorage): + trigger_1 = UiPathResumeTrigger( + interrupt_id="interrupt-1", + trigger_type=UiPathResumeTriggerType.API, + trigger_name=UiPathResumeTriggerName.API, + payload={"msg": "one"}, + ) + trigger_2 = UiPathResumeTrigger( + interrupt_id="interrupt-2", + trigger_type=UiPathResumeTriggerType.JOB, + trigger_name=UiPathResumeTriggerName.JOB, + payload={"msg": "two"}, + ) + + await storage.save_triggers("runtime-1", [trigger_1, trigger_2]) + loaded = await storage.get_triggers("runtime-1") + + assert loaded is not None + assert len(loaded) == 2 + assert loaded[0].interrupt_id == "interrupt-1" + assert loaded[1].interrupt_id == "interrupt-2" + + await storage.delete_trigger("runtime-1", loaded[0]) + remaining = await storage.get_triggers("runtime-1") + assert remaining is not None + assert len(remaining) == 1 + assert remaining[0].interrupt_id == "interrupt-2" + + await storage.save_triggers("runtime-1", []) + assert await storage.get_triggers("runtime-1") is None From ca1362b429809befc529dbc2b8b92baf3a49da57 Mon Sep 17 00:00:00 2001 From: Cosmin Paunel Date: Tue, 10 Mar 2026 00:39:51 +0200 Subject: [PATCH 2/3] fix: resumable runtime --- packages/uipath-openai-agents/pyproject.toml | 4 +- .../uipath_openai_agents/runtime/factory.py | 1 + .../uipath_openai_agents/runtime/runtime.py | 406 ++++++++++-------- packages/uipath-openai-agents/uv.lock | 16 +- 4 files changed, 239 insertions(+), 188 deletions(-) diff --git a/packages/uipath-openai-agents/pyproject.toml b/packages/uipath-openai-agents/pyproject.toml index c4951598..66cee80b 100644 --- a/packages/uipath-openai-agents/pyproject.toml +++ b/packages/uipath-openai-agents/pyproject.toml @@ -1,13 +1,13 @@ [project] name = "uipath-openai-agents" -version = "0.0.9" +version = "0.0.10" description = "Python SDK that enables developers to build and deploy OpenAI agents to the UiPath Cloud Platform" readme = "README.md" requires-python = ">=3.11" dependencies = [ "aiosqlite>=0.20.0", "openai>=1.0.0", - "openai-agents>=0.6.5", + "openai-agents>=0.11.1", "openinference-instrumentation-openai-agents>=1.4.0", "uipath>=2.10.0, <2.11.0", "uipath-runtime>=0.9.0, <0.10.0", diff --git a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/factory.py b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/factory.py index e6bead15..f3490cb1 100644 --- a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/factory.py +++ b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/factory.py @@ -262,6 +262,7 @@ async def _create_runtime_instance( runtime_id=runtime_id, entrypoint=entrypoint, session=session, + storage=storage, ) return UiPathResumableRuntime( diff --git a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/runtime.py b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/runtime.py index 491e7cfe..fca6e485 100644 --- a/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/runtime.py +++ b/packages/uipath-openai-agents/src/uipath_openai_agents/runtime/runtime.py @@ -1,16 +1,20 @@ """Runtime class for executing OpenAI Agents within the UiPath framework.""" +import inspect import json +from collections.abc import Mapping from typing import Any, AsyncGenerator from uuid import uuid4 -from agents import Agent, Runner, SQLiteSession +from agents import Agent, Runner, RunState, SQLiteSession +from agents.items import ToolApprovalItem from pydantic import BaseModel from uipath.core.serialization import serialize_json from uipath.runtime import ( UiPathExecuteOptions, UiPathRuntimeResult, UiPathRuntimeStatus, + UiPathRuntimeStorageProtocol, UiPathStreamOptions, ) from uipath.runtime.errors import UiPathErrorCategory, UiPathErrorCode @@ -25,11 +29,12 @@ from .errors import UiPathOpenAIAgentsErrorCode, UiPathOpenAIAgentsRuntimeError from .schema import get_agent_schema, get_entrypoints_schema +_RUN_STATE_NAMESPACE = "openai_agents" +_RUN_STATE_KEY = "run_state" + class UiPathOpenAIAgentRuntime: - """ - A runtime class for executing OpenAI Agents within the UiPath framework. - """ + """A runtime class for executing OpenAI Agents within the UiPath framework.""" def __init__( self, @@ -37,22 +42,22 @@ def __init__( runtime_id: str | None = None, entrypoint: str | None = None, session: SQLiteSession | None = None, + storage: UiPathRuntimeStorageProtocol | None = None, ): - """ - Initialize the runtime. + """Initialize the runtime. Args: agent: The OpenAI Agent to execute runtime_id: Unique identifier for this runtime instance entrypoint: Optional entrypoint name (for schema generation) session: Optional OpenAI Agents SDK session for persistent memory + storage: Optional storage for persisting RunState across process restarts """ self.agent: Agent = agent self.runtime_id: str = runtime_id or "default" self.entrypoint: str | None = entrypoint self._session = session - - # Detect context type from agent's generic parameter + self._storage = storage self._context_type: type[BaseModel] | None = get_agent_context_type(agent) async def execute( @@ -60,19 +65,7 @@ async def execute( input: dict[str, Any] | None = None, options: UiPathExecuteOptions | None = None, ) -> UiPathRuntimeResult: - """ - Execute the agent with the provided input and configuration. - - Args: - input: Input dictionary containing the message for the agent - options: Execution options - - Returns: - UiPathRuntimeResult with the agent's output - - Raises: - UiPathOpenAIAgentRuntimeError: If execution fails - """ + """Execute the agent with the provided input and configuration.""" try: result: UiPathRuntimeResult | None = None async for event in self._run_agent(input, options, stream_events=False): @@ -92,20 +85,7 @@ async def stream( input: dict[str, Any] | None = None, options: UiPathStreamOptions | None = None, ) -> AsyncGenerator[UiPathRuntimeEvent, None]: - """ - Stream agent execution events in real-time. - - Args: - input: Input dictionary containing the message for the agent - options: Stream options - - Yields: - UiPathRuntimeEvent instances during execution, - then the final UiPathRuntimeResult - - Raises: - UiPathOpenAIAgentRuntimeError: If execution fails - """ + """Stream agent execution events in real-time.""" try: async for event in self._run_agent(input, options, stream_events=True): yield event @@ -118,58 +98,11 @@ async def _run_agent( options: UiPathExecuteOptions | UiPathStreamOptions | None, stream_events: bool, ) -> AsyncGenerator[UiPathRuntimeEvent | UiPathRuntimeResult, None]: - """ - Core agent execution logic used by both execute() and stream(). - - Args: - input: Input dictionary - options: Execution/stream options - stream_events: Whether to stream events during execution - - Yields: - Runtime events if stream_events=True, then final result - """ - # Prepare input and context - agent_input, context = self._prepare_agent_input_and_context(input) - - # Run the agent with streaming if events requested - if stream_events: - # Use streaming for events - async for event_or_result in self._run_agent_streamed( - agent_input, context, options, stream_events - ): - yield event_or_result - else: - # Use non-streaming for simple execution - result = await Runner.run( - starting_agent=self.agent, - input=agent_input, - context=context, - session=self._session, - ) - yield self._create_success_result(result.final_output) - - async def _run_agent_streamed( - self, - agent_input: str | list[Any], - context: Any | None, - options: UiPathExecuteOptions | UiPathStreamOptions | None, - stream_events: bool, - ) -> AsyncGenerator[UiPathRuntimeEvent | UiPathRuntimeResult, None]: - """ - Run agent using streaming API to enable event streaming. - - Args: - agent_input: Prepared agent input (string or list of messages) - context: Optional context object (Pydantic model instance) - options: Execution/stream options - stream_events: Whether to yield streaming events to caller - - Yields: - Runtime events if stream_events=True, then final result - """ + """Core agent execution logic used by both execute() and stream().""" + agent_input, context = await self._prepare_agent_input_and_context( + input, options + ) - # Use Runner.run_streamed() for streaming events (returns RunResultStreaming directly) result = Runner.run_streamed( starting_agent=self.agent, input=agent_input, @@ -177,122 +110,252 @@ async def _run_agent_streamed( session=self._session, ) - # Stream events from the agent async for event in result.stream_events(): - # Emit the event to caller if streaming is enabled if stream_events: runtime_event = self._convert_stream_event_to_runtime_event(event) if runtime_event: yield runtime_event - # Stream complete - yield final result + interruptions = list(result.interruptions or []) + if interruptions: + if self._storage is not None: + await self._save_run_state(result.to_state()) + + yield UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUSPENDED, + output=self._build_suspend_output(interruptions), + ) + return + yield self._create_success_result(result.final_output) - def _convert_stream_event_to_runtime_event( - self, - event: Any, - ) -> UiPathRuntimeEvent | None: - """ - Convert OpenAI streaming event to UiPath runtime event. + # ------------------------------------------------------------------ + # Suspend / Resume + # ------------------------------------------------------------------ + + def _build_suspend_output( + self, interruptions: list[ToolApprovalItem] + ) -> dict[str, dict[str, Any]]: + """Build suspend output map from OpenAI SDK interruptions.""" + suspend_output: dict[str, dict[str, Any]] = {} + for index, interruption in enumerate(interruptions): + interrupt_id = self._interrupt_id(interruption, index) + raw_type = getattr(getattr(interruption, "raw_item", None), "type", None) + + if raw_type == "mcp_approval_request": + payload: dict[str, Any] = { + "type": "mcp_approval_request", + "approval_request_id": interrupt_id, + } + else: + payload = {"type": "tool_approval_request"} + suspend_output[interrupt_id] = payload + return suspend_output + + def _interrupt_id(self, interruption: ToolApprovalItem, index: int) -> str: + """Return stable interrupt id for an SDK interruption item.""" + # ToolApprovalItem.call_id already falls through to raw_item.id + call_id = interruption.call_id + if isinstance(call_id, str) and call_id: + return call_id + return f"interrupt-{index + 1}" + + async def _save_run_state(self, run_state: RunState[Any]) -> None: + """Persist OpenAI RunState JSON to runtime storage for resume.""" + if self._storage is None: + return + + run_state_json = run_state.to_json( + context_serializer=self._serialize_context_for_run_state + ) + await self._storage.set_value( + self.runtime_id, + _RUN_STATE_NAMESPACE, + _RUN_STATE_KEY, + run_state_json, + ) - Args: - event: Streaming event from Runner.run_streamed() + async def _load_run_state(self) -> RunState[Any] | None: + """Load persisted OpenAI RunState from runtime storage.""" + if self._storage is None: + return None - Returns: - UiPathRuntimeEvent or None if event should be filtered - """ + state_json = await self._storage.get_value( + self.runtime_id, + _RUN_STATE_NAMESPACE, + _RUN_STATE_KEY, + ) + if not isinstance(state_json, dict): + return None - event_type = getattr(event, "type", None) - event_name = getattr(event, "name", None) + context_deserializer = ( + self._deserialize_context_for_run_state + if self._context_type is not None + else None + ) + maybe_run_state = RunState.from_json( + initial_agent=self.agent, + state_json=state_json, + context_deserializer=context_deserializer, + ) + if inspect.isawaitable(maybe_run_state): + return await maybe_run_state + return maybe_run_state - # Handle run item events (messages, tool calls, etc.) - if event_type == "run_item_stream_event": - event_item = getattr(event, "item", None) - if event_item: - # Determine if this is a message or state event - if event_name in ["message_output_created", "reasoning_item_created"]: - return UiPathRuntimeMessageEvent( - payload=json.loads(serialize_json(event_item)), - metadata={"event_name": event_name}, - ) - else: - return UiPathRuntimeStateEvent( - payload=json.loads(serialize_json(event_item)), - metadata={"event_name": event_name}, - ) + def _apply_resume_decisions( + self, + run_state: RunState[Any], + resume_input: dict[str, Any], + ) -> None: + """Apply UiPath resume decisions to OpenAI RunState interruptions.""" + interruptions = run_state.get_interruptions() + if not interruptions: + return + + interruption_by_id: dict[str, ToolApprovalItem] = { + self._interrupt_id(interruption, index): interruption + for index, interruption in enumerate(interruptions) + } + + for interrupt_id, resume_value in resume_input.items(): + if interrupt_id == "messages": + continue + + interruption = interruption_by_id.get(interrupt_id) + if interruption is None: + continue + + if self._is_approved(resume_value): + run_state.approve(interruption) + else: + run_state.reject(interruption) + + @staticmethod + def _is_approved(value: Any) -> bool: + """Extract approval decision from a resume value. + + Accepts a plain bool or a dict with an "approve" key + (e.g. MCP approval response payloads). + """ + if isinstance(value, bool): + return value + if isinstance(value, BaseModel): + value = value.model_dump(exclude_unset=True) + if isinstance(value, dict): + return bool(value.get("approve", False)) + return False + + # ------------------------------------------------------------------ + # Context serialization for RunState + # ------------------------------------------------------------------ + + def _serialize_context_for_run_state(self, context: Any) -> dict[str, Any]: + """Serialize run context into a JSON-compatible mapping for RunState.""" + if context is None: + return {} + if isinstance(context, dict): + return context + if isinstance(context, BaseModel): + return context.model_dump() + try: + result = json.loads(serialize_json(context)) + return result if isinstance(result, dict) else {"value": result} + except Exception: + return {"value": str(context)} + + def _deserialize_context_for_run_state(self, context: Mapping[str, Any]) -> Any: + """Restore context object from RunState serialized payload.""" + context_dict = dict(context) + if self._context_type is None: + return context_dict + try: + return self._context_type.model_validate(context_dict) + except Exception: + return context_dict - # Handle agent updated events - if event_type == "agent_updated_stream_event": - new_agent = getattr(event, "new_agent", None) - if new_agent: - return UiPathRuntimeStateEvent( - payload={"agent_name": getattr(new_agent, "name", "unknown")}, - metadata={"event_type": "agent_updated"}, - ) + # ------------------------------------------------------------------ + # Input preparation + # ------------------------------------------------------------------ - # Filter out raw response events (too granular) - return None + async def _prepare_agent_input_and_context( + self, + input: dict[str, Any] | None, + options: UiPathExecuteOptions | UiPathStreamOptions | None = None, + ) -> tuple[str | list[Any] | RunState[Any], Any | None]: + """Prepare agent input and context from UiPath input dictionary. - def _prepare_agent_input_and_context( - self, input: dict[str, Any] | None - ) -> tuple[str | list[Any], Any | None]: + On resume: loads persisted RunState, applies approval decisions, returns it. + Otherwise: extracts messages and optional context from input dict. """ - Prepare agent input and context from UiPath input dictionary. + if options and options.resume: + run_state = await self._load_run_state() + if run_state is None: + raise RuntimeError( + f"Resume requested but no persisted run state found " + f"for runtime_id={self.runtime_id}" + ) + if input: + self._apply_resume_decisions(run_state, input) + return run_state, None - - 'messages' field is always extracted as the LLM input - - If agent has a context type, remaining fields are parsed into Pydantic model - """ if not input: return "", None - # Extract messages (always goes to LLM) messages = input.get("messages", "") if not isinstance(messages, (str, list)): messages = "" - # If agent has a context type, parse remaining fields into context context = None if self._context_type is not None: try: context = parse_input_to_context(input, self._context_type) except ValueError: - pass # Fallback to no context if parsing fails + pass return messages, context - def _serialize_message(self, message: Any) -> dict[str, Any]: - """ - Serialize an agent message for event streaming. - - Args: - message: Message object from the agent + # ------------------------------------------------------------------ + # Event conversion + # ------------------------------------------------------------------ - Returns: - Dictionary representation of the message - """ - serialized = json.loads(serialize_json(message)) + def _convert_stream_event_to_runtime_event( + self, + event: Any, + ) -> UiPathRuntimeEvent | None: + """Convert OpenAI streaming event to UiPath runtime event.""" + event_type = getattr(event, "type", None) + event_name = getattr(event, "name", None) - # Ensure the result is a dictionary - if isinstance(serialized, dict): - return serialized + if event_type == "run_item_stream_event": + event_item = getattr(event, "item", None) + if event_item: + if event_name in ["message_output_created", "reasoning_item_created"]: + return UiPathRuntimeMessageEvent( + payload=json.loads(serialize_json(event_item)), + metadata={"event_name": event_name}, + ) + return UiPathRuntimeStateEvent( + payload=json.loads(serialize_json(event_item)), + metadata={"event_name": event_name}, + ) - # Fallback to wrapping in a content field - return {"content": serialized} + if event_type == "agent_updated_stream_event": + new_agent = getattr(event, "new_agent", None) + if new_agent: + return UiPathRuntimeStateEvent( + payload={"agent_name": getattr(new_agent, "name", "unknown")}, + metadata={"event_type": "agent_updated"}, + ) - def _create_success_result(self, output: Any) -> UiPathRuntimeResult: - """ - Create result for successful completion. + return None - Args: - output: The agent's output + # ------------------------------------------------------------------ + # Result / error helpers + # ------------------------------------------------------------------ - Returns: - UiPathRuntimeResult with serialized output - """ - # Serialize output + def _create_success_result(self, output: Any) -> UiPathRuntimeResult: + """Create result for successful completion.""" serialized_output = json.loads(serialize_json(output)) - - # Ensure output is a dictionary if not isinstance(serialized_output, dict): serialized_output = {"result": serialized_output} @@ -302,15 +365,7 @@ def _create_success_result(self, output: Any) -> UiPathRuntimeResult: ) def _create_runtime_error(self, e: Exception) -> UiPathOpenAIAgentsRuntimeError: - """ - Handle execution errors and create appropriate runtime error. - - Args: - e: The exception that occurred - - Returns: - UiPathOpenAIAgentsRuntimeError with appropriate error code - """ + """Handle execution errors and create appropriate runtime error.""" if isinstance(e, UiPathOpenAIAgentsRuntimeError): return e @@ -340,12 +395,7 @@ def _create_runtime_error(self, e: Exception) -> UiPathOpenAIAgentsRuntimeError: ) async def get_schema(self) -> UiPathRuntimeSchema: - """ - Get schema for this OpenAI Agent runtime. - - Returns: - UiPathRuntimeSchema with input/output schemas and graph structure - """ + """Get schema for this OpenAI Agent runtime.""" entrypoints_schema = get_entrypoints_schema(self.agent) return UiPathRuntimeSchema( diff --git a/packages/uipath-openai-agents/uv.lock b/packages/uipath-openai-agents/uv.lock index b56878df..7f343e31 100644 --- a/packages/uipath-openai-agents/uv.lock +++ b/packages/uipath-openai-agents/uv.lock @@ -1226,7 +1226,7 @@ wheels = [ [[package]] name = "openai" -version = "2.15.0" +version = "2.26.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "anyio" }, @@ -1238,14 +1238,14 @@ dependencies = [ { name = "tqdm" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/94/f4/4690ecb5d70023ce6bfcfeabfe717020f654bde59a775058ec6ac4692463/openai-2.15.0.tar.gz", hash = "sha256:42eb8cbb407d84770633f31bf727d4ffb4138711c670565a41663d9439174fba", size = 627383, upload-time = "2026-01-09T22:10:08.603Z" } +sdist = { url = "https://files.pythonhosted.org/packages/d7/91/2a06c4e9597c338cac1e5e5a8dd6f29e1836fc229c4c523529dca387fda8/openai-2.26.0.tar.gz", hash = "sha256:b41f37c140ae0034a6e92b0c509376d907f3a66109935fba2c1b471a7c05a8fb", size = 666702, upload-time = "2026-03-05T23:17:35.874Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/b5/df/c306f7375d42bafb379934c2df4c2fa3964656c8c782bac75ee10c102818/openai-2.15.0-py3-none-any.whl", hash = "sha256:6ae23b932cd7230f7244e52954daa6602716d6b9bf235401a107af731baea6c3", size = 1067879, upload-time = "2026-01-09T22:10:06.446Z" }, + { url = "https://files.pythonhosted.org/packages/c6/2e/3f73e8ca53718952222cacd0cf7eecc9db439d020f0c1fe7ae717e4e199a/openai-2.26.0-py3-none-any.whl", hash = "sha256:6151bf8f83802f036117f06cc8a57b3a4da60da9926826cc96747888b57f394f", size = 1136409, upload-time = "2026-03-05T23:17:34.072Z" }, ] [[package]] name = "openai-agents" -version = "0.6.8" +version = "0.11.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "griffe" }, @@ -1256,9 +1256,9 @@ dependencies = [ { name = "types-requests" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/7a/48/4e09b9bbb5b1e26750fcde175e21a5f1600b24d7406f69c183ecef36adec/openai_agents-0.6.8.tar.gz", hash = "sha256:2daa041db0218080b973b3f6b91d3bd1365c4ee21056df71b9c4aa5ffe0bdfeb", size = 2148265, upload-time = "2026-01-19T01:40:46.576Z" } +sdist = { url = "https://files.pythonhosted.org/packages/08/5e/79875ab7f0f2da8247d76616001ab3a82f6b128262a5c69367530689e69c/openai_agents-0.11.1.tar.gz", hash = "sha256:b2bec1a780a2e2f2419e9688931eb65649bb5283f99e946018d4f1b67d4e95ca", size = 2582366, upload-time = "2026-03-09T06:34:07.701Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/c1/94/422f153896bd64d218845da92f774e4f016926b5347f340f59a3b0ed7856/openai_agents-0.6.8-py3-none-any.whl", hash = "sha256:66d2920aa1c71e6529f2f2eda46eba66b0508788d7af28feb50586901ccb5c66", size = 283322, upload-time = "2026-01-19T01:40:44.261Z" }, + { url = "https://files.pythonhosted.org/packages/f7/e9/d8d8a39a2e3c5fb1a538a13a6928f4223ff6664b8ba2a6137187b0f69370/openai_agents-0.11.1-py3-none-any.whl", hash = "sha256:4fda67bfe2aab4a1cd4a701d4e8d3d1eb849ba66aeea51295dbedf8a9e52cdb1", size = 434624, upload-time = "2026-03-09T06:34:05.653Z" }, ] [[package]] @@ -2329,7 +2329,7 @@ wheels = [ [[package]] name = "uipath-openai-agents" -version = "0.0.9" +version = "0.0.10" source = { editable = "." } dependencies = [ { name = "aiosqlite" }, @@ -2355,7 +2355,7 @@ dev = [ requires-dist = [ { name = "aiosqlite", specifier = ">=0.20.0" }, { name = "openai", specifier = ">=1.0.0" }, - { name = "openai-agents", specifier = ">=0.6.5" }, + { name = "openai-agents", specifier = ">=0.11.1" }, { name = "openinference-instrumentation-openai-agents", specifier = ">=1.4.0" }, { name = "uipath", specifier = ">=2.10.0,<2.11.0" }, { name = "uipath-runtime", specifier = ">=0.9.0,<0.10.0" }, From 56807908ce0bd9d9a715fd56dd36f1536c01917b Mon Sep 17 00:00:00 2001 From: Cosmin Paunel Date: Tue, 10 Mar 2026 00:51:30 +0200 Subject: [PATCH 3/3] fix: test --- packages/uipath-openai-agents/tests/conftest.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/packages/uipath-openai-agents/tests/conftest.py b/packages/uipath-openai-agents/tests/conftest.py index 3dc776ae..c18bfffa 100644 --- a/packages/uipath-openai-agents/tests/conftest.py +++ b/packages/uipath-openai-agents/tests/conftest.py @@ -1,3 +1,4 @@ +import shutil import tempfile from typing import Generator @@ -13,6 +14,11 @@ def runner() -> CliRunner: @pytest.fixture def temp_dir() -> Generator[str, None, None]: - """Provide a temporary directory for test files.""" - with tempfile.TemporaryDirectory() as tmp_dir: - yield tmp_dir + """Provide a temporary directory for test files. + + Cleanup ignores errors to avoid PermissionError on Windows when + SQLiteSession worker-thread connections still hold the state.db file open. + """ + tmp_dir = tempfile.mkdtemp() + yield tmp_dir + shutil.rmtree(tmp_dir, ignore_errors=True)