Python: Add create_checkpoint to workflow#6407
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces an explicit “reset between independent runs” capability for Python workflows, so a single Workflow instance (and its executors/orchestrators) can be safely reused without leaking in-memory state across unrelated jobs/conversations. It also tightens concurrent-run protection by reserving the runner synchronously, and wires reset behavior into orchestrations and Foundry hosting.
Changes:
- Add
Workflow.reset_for_new_run()/Runner.reset_for_new_run()plus per-executorreset()hooks, and switch concurrency guarding to a synchronousRunner.reserve()lifecycle. - Implement reset behavior for key executors/orchestrators (e.g.,
AgentExecutor,WorkflowExecutor, group chat + Magentic orchestrators) and validate via new unit tests. - Add a new workflow sample (
workflow_reset.py) and index it in the workflows samples README; update Foundry hosting to reset workflows when there’s no prior conversation context to restore.
Reviewed changes
Copilot reviewed 22 out of 23 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| python/samples/03-workflows/state-management/workflow_reset.py | New sample demonstrating Workflow.reset_for_new_run() for per-job state isolation. |
| python/samples/03-workflows/README.md | Adds the new reset sample to the workflows samples table. |
| python/packages/orchestrations/tests/test_group_chat.py | Adds reset-focused tests for group chat orchestrators and end-to-end workflow reset. |
| python/packages/orchestrations/agent_framework_orchestrations/_magentic.py | Adds Magentic-specific _reset_pattern_state() implementation. |
| python/packages/orchestrations/agent_framework_orchestrations/_group_chat.py | Clears per-run cache and conditionally rotates/preserves sessions on reset. |
| python/packages/orchestrations/agent_framework_orchestrations/_base_group_chat_orchestrator.py | Adds an async reset() that clears shared conversation state and calls a pattern hook. |
| python/packages/foundry_hosting/tests/test_responses.py | Adds tests ensuring hosting resets workflows for independent requests (and doesn’t reset when restoring checkpoints). |
| python/packages/foundry_hosting/agent_framework_foundry_hosting/_responses.py | Resets the workflow when there is no prior checkpoint context to restore. |
| python/packages/core/tests/workflow/test_workflow.py | Updates concurrency exception expectations; adds Workflow.reset_for_new_run() coverage. |
| python/packages/core/tests/workflow/test_sub_workflow.py | Adds reset coverage for WorkflowExecutor (sub-workflows). |
| python/packages/core/tests/workflow/test_runner.py | Adds reservation lifecycle + runner reset tests; updates existing tests to call reserve(). |
| python/packages/core/tests/workflow/test_executor.py | Adds tests for default and overridden Executor.reset(). |
| python/packages/core/tests/workflow/test_agent_executor.py | Adds reset tests for AgentExecutor, including session ownership behavior. |
| python/packages/core/tests/core/test_mcp_skills.py | Formatting-only refactors for test readability (no behavior changes). |
| python/packages/core/tests/core/test_mcp_observability.py | Formatting-only refactors for test readability (no behavior changes). |
| python/packages/core/agent_framework/_workflows/_workflow.py | Switches to runner reservation for concurrency; stores run kwargs via Runner.state; adds reset_for_new_run(). |
| python/packages/core/agent_framework/_workflows/_workflow_executor.py | Adds WorkflowExecutor.reset() to clear in-flight execution mappings and reset wrapped workflow. |
| python/packages/core/agent_framework/_workflows/_runner.py | Introduces runner lifecycle enum, synchronous reserve()/release(), and reset_for_new_run() that clears state/context and calls executor.reset(). |
| python/packages/core/agent_framework/_workflows/_executor.py | Adds async Executor.reset() hook (default no-op) for workflow resets. |
| python/packages/core/agent_framework/_workflows/_agent_executor.py | Makes reset async; clears per-run buffers and rotates session only when auto-created (warns when caller-supplied). |
| python/packages/core/agent_framework/_skills.py | Small formatting simplifications. |
| python/packages/core/agent_framework/_mcp.py | Small formatting simplifications. |
There was a problem hiding this comment.
Automated Code Review
Reviewers: 4 | Confidence: 63%
✓ Security Reliability
No actionable issues found in this dimension.
✓ Test Coverage
The PR adds extensive test coverage for the new workflow reset functionality across Runner, AgentExecutor, WorkflowExecutor, Workflow, base orchestrator, and the foundry hosting layer. The one notable gap is that MagenticOrchestrator._reset_pattern_state() — which clears four state fields — has no dedicated unit test, unlike the analogous AgentBasedGroupChatOrchestrator._reset_pattern_state() which is tested in test_group_chat.py.
✓ Failure Modes
The PR adds a well-designed workflow reset mechanism with proper lifecycle state management (IDLE/RESERVED/RUNNING enum replacing a boolean flag) and comprehensive test coverage. The main concern is that Runner.reset_for_new_run() mutates shared state (iteration counter, context, State) before iterating over executors to call reset(). If any executor's reset() raises (e.g., AgentExecutor.create_session() fails for a remote agent), the runner is left in an inconsistent half-reset state with no rollback path. The formatting changes are purely cosmetic and correct.
✓ Design Approach
I found one design issue in the workflow run-lock changes. The new synchronous
Runner.reserve()call correctly blocks races before the firstawait, but this code path no longer attaches aResponseStreamcleanup hook. That means a caller can obtainworkflow.run(..., stream=True)and drop the returned stream before iteration starts, leaving the runner permanently reserved and causing later runs on the same workflow instance to fail as if one were still in progress. That leaves users with two contradictory lifecycle contracts for the same type.
Automated review by TaoChenOSU's agents
There was a problem hiding this comment.
Automated Code Review
Reviewers: 5 | Confidence: 89%
✓ Correctness
No actionable issues found in this dimension.
✓ Security Reliability
The
InProcRunnerContext.reset_for_new_run()implementation does not clear_pending_request_info_events, violating the documented 'clean slate' contract ofWorkflow.reset_for_new_run()which states it resets EVERYTHING 'including pending requests/messages'. In the foundry-hosting use case (the motivating scenario), stale request_info events from a prior workflow run would persist after reset, constituting a state leak.
✓ Test Coverage
The PR adds comprehensive test coverage for the new
reset_for_new_runfeature across Workflow, Runner, AgentExecutor, WorkflowExecutor, and orchestrators. However, two test coverage gaps stand out: (1)InProcRunnerContext.reset_for_new_run()does not clear_pending_request_info_events, and the tests don't verify this behavior—meaning pending approval requests from a prior run could leak into a fresh run after reset, contradicting the documented guarantee; (2)MagenticOrchestrator._reset_pattern_state()is new production code with no dedicated test. The group chat orchestrator reset tests are well-structured and cover the base class, agent-based subclass, session management, and end-to-end workflow reset. However, there is a notable test coverage gap: the MagenticOrchestrator._reset_pattern_state() override (which resets four critical fields including _terminated) has no dedicated test, despite _terminated being the guard that prevents reuse (line 914 raises RuntimeError if True).
✓ Failure Modes
The
InProcRunnerContext.reset_for_new_run()method (line 403-412 of_runner_context.py) clears messages, events, and the streaming flag, but does NOT clear_pending_request_info_events. WhenRunner.reset_for_new_run()callsself._ctx.reset_for_new_run()at line 99 of_runner.py, stale request_info events from a prior run survive the reset. This contradicts the PR's goal of ensuring 'no residual states' and can cause stale events to be baked into the next run's checkpoints (line 389 of_runner_context.py). TheMagenticOrchestrator._reset_pattern_state()resets the orchestrator's own fields but does not rotate or clear theStandardMagenticManager._session, which accumulates LM conversation history across every_complete()call. Afterreset_for_new_run(), subsequent agent calls reuse a session carrying stale context from the prior run. The checkpoint logic explicitly saves/restores this session (line 746), confirming it holds run-significant state.
✗ Design Approach
The new reset API leaves one class of per-run state behind: pending
request_infobookkeeping in the runner context. That meansreset_for_new_run()does not actually provide the clean-slate semantics this PR is introducing for hosted-agent scenarios, and stale request IDs can still be consumed after a reset. The new group-chat reset hook mostly covers orchestrator-local state, but Magentic still leaves its manager-owned agent session alive acrossreset_for_new_run(). That means one important path can keep prior-run context even after a reset, which undercuts the PR's goal of reusing a workflow instance for independent runs without residual state.
Flagged Issues
-
_magentic.py:1267:_reset_pattern_state()resets only orchestrator fields, butStandardMagenticManagercreates a long-lived session once (_magentic.py:571-572) and reuses it for every planner call (_magentic.py:601). The code pathworkflow.run(...); workflow.reset_for_new_run(); workflow.run(...)still sends the second run through the original manager session carrying stale conversation history. The existing agent-backed group chat reset path shows the intended pattern by rotating auto-created sessions on reset (_group_chat.py:349-358).
Suggestions
- Add a test verifying that
InProcRunnerContext.reset_for_new_run()clears_pending_request_info_events. The current implementation at_runner_context.py:403-412does NOT clear them, yetWorkflow.reset_for_new_run()documents: 'This will reset EVERYTHING... including pending requests/messages.' This is the exact scenario from the PR motivation (Foundry hosted agent receiving a new message without prior context).
Automated review by TaoChenOSU's agents
create_checkpoint to workflow
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 18 out of 18 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
python/packages/core/agent_framework/init.py:292
- This change removes
Runnerfrom the package-level public exports (agent_framework.Runner). Since it was previously part of__init__.py(and__all__), this is an API surface reduction that can break downstream imports even if internal code no longer uses it.
If Runner is intended to become non-public, consider leaving a deprecated re-export for at least one release, or (if removal is required immediately) mark the PR as breaking and document the change in release notes/changelog.
step,
workflow,
)
from ._workflows._request_info_mixin import response_handler
from ._workflows._runner_context import (
InProcRunnerContext,
RunnerContext,
WorkflowMessage,
There was a problem hiding this comment.
Automated Code Review
Reviewers: 1 | Confidence: 92%
✗ Design Approach
I found two correctness issues in the new checkpointing/run-lifecycle design.
Workflow.create_checkpoint()now treats "idle" as merely "no live ResponseStream", which allows callers to snapshot a workflow that still has in-flight executor messages from an incomplete run. Separately,Runneronly clears its resumed flag on the success path, so a failed resumed run leaks resume state into the next run and silently skips that run’s initial checkpoint.
Flagged Issues
- python/packages/core/agent_framework/_workflows/_workflow.py:493 allows create_checkpoint() to persist a non-idle runner if a prior run cleared _active_run but left executor messages queued.
- python/packages/core/agent_framework/_workflows/_runner.py:163 clears _resumed_from_checkpoint only on successful completion, so a failed resumed run is still treated as resumed on the next call and skips the superstep-0 checkpoint.
Automated review by TaoChenOSU's agents
|
Flagged issue python/packages/core/agent_framework/_workflows/_workflow.py:493 allows create_checkpoint() to persist a non-idle runner if a prior run cleared _active_run but left executor messages queued. Source: automated DevFlow PR review |
|
Flagged issue python/packages/core/agent_framework/_workflows/_runner.py:163 clears _resumed_from_checkpoint only on successful completion, so a failed resumed run is still treated as resumed on the next call and skips the superstep-0 checkpoint. Source: automated DevFlow PR review |
| "RoleLiteral", | ||
| "RubricScore", | ||
| "RunContext", | ||
| "Runner", |
There was a problem hiding this comment.
Should we treat dropping Runner from the top-level exports as a breaking change? It has been in __all__ since #3908 and shipped in the released 1.x line, so from agent_framework import Runner now raises ImportError for anyone who imported it, yet the PR is not marked [BREAKING]. Could we either keep a lazy __getattr__ re-export with a DeprecationWarning, or add the [BREAKING] prefix and note the removal explicitly?
| # caller's stream is later garbage collected, ``_active_run`` is already | ||
| # ``None`` (or has been replaced by a newer run's weakref) - no GC-time | ||
| # finalizer is needed. | ||
| self._active_run = None |
There was a problem hiding this comment.
Could a late finalizer here reset a newer run's guard? _run_core's finally clears self._active_run unconditionally. If a caller abandons a partially consumed stream and then starts another run, the abandoned async generator is closed by asyncio's async-gen finalizer hook at GC time, which runs this finally and sets _active_run = None while the new run is still live.
I confirmed the ordering locally: the dropped generator's finally fires on a later loop tick and overwrites the value the new run just set. That silently reopens the concurrency guard, so a subsequent run() (or a create_checkpoint(), which also gates only on _is_run_active()) could proceed against an already-active run and mutate shared runner/context/state concurrently. This is exactly the "replaced by a newer run's weakref" case the inline comment assumes is safe.
Should we compare-and-clear instead, only resetting when self._active_run() is still this run's own stream, so a stale finalizer can't clobber a newer run?
Motivation and Context
When a workflow is hosted as an agent in Foundry Hosted Agent, a new message comes in without a conversation ID or a previous response ID should trigger a new workflow run, without any residual states in the workflow from previous interactions.
This is currently not possible unless a new workflow instance is built from the builder, which is impossible in hosted agent since the workflow instance is only created once when a session starts. This PR addresses the limitation by introducing
create_checkpointon the workflow class, which allows clients to create checkpoints at arbitrary point in time while a workflow is idle, including the very beginning where the workflow hasn't received any messages yet. This checkpoint can be then used to restore the workflow to its initial state.Description
create_checkpointto workflow.create_checkpointin_responses.pyto save the initial state of a workflow and use the checkpoint to restore the workflow to its initial state when a request doesn't have a previous response id nor conversation id.Contribution Checklist