diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs index d3f229a7da9..7fd10b568e2 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs @@ -60,9 +60,6 @@ private InProcessRunner(Workflow workflow, ICheckpointManager? checkpointManager this._knownValidInputTypes = knownValidInputTypes != null ? [.. knownValidInputTypes] : []; - - // Initialize the runners for each of the edges, along with the state for edges that need it. - this.EdgeMap = new EdgeMap(this.RunContext, this.Workflow.Edges, this.Workflow.Ports.Values, this.Workflow.StartExecutorId, this.StepTracer); } /// @@ -154,7 +151,6 @@ ValueTask ISuperStepRunner.EnqueueResponseAsync(ExternalResponse response, Cance private Workflow Workflow { get; init; } internal InProcessRunnerContext RunContext { get; init; } private ICheckpointManager? CheckpointManager { get; } - private EdgeMap EdgeMap { get; init; } public ConcurrentEventSink OutgoingEvents { get; } = new(); @@ -358,7 +354,7 @@ internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = d // Create a representation of the current workflow if it does not already exist. this._workflowInfoCache ??= this.Workflow.ToWorkflowInfo(); - Dictionary edgeData = await this.EdgeMap.ExportStateAsync().ConfigureAwait(false); + Dictionary edgeData = await this.RunContext.ExportEdgeStateAsync().ConfigureAwait(false); await prepareTask.ConfigureAwait(false); await this.RunContext.StateManager.PublishUpdatesAsync(this.StepTracer).ConfigureAwait(false); @@ -422,7 +418,7 @@ private async ValueTask RestoreCheckpointCoreAsync(CheckpointInfo checkpointInfo Task executorNotifyTask = this.RunContext.NotifyCheckpointLoadedAsync(cancellationToken); - await this.EdgeMap.ImportStateAsync(checkpoint).ConfigureAwait(false); + await this.RunContext.ImportEdgeStateAsync(checkpoint).ConfigureAwait(false); await Task.WhenAll(executorNotifyTask, restoreCheckpointIndexTask.AsTask()).ConfigureAwait(false); diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs index 8201f8d8fe2..ba071ec712a 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs @@ -411,6 +411,20 @@ internal ValueTask ExportStateAsync() return new(result); } + internal ValueTask> ExportEdgeStateAsync() + { + this.CheckEnded(); + + return this._edgeMap.ExportStateAsync(); + } + + internal ValueTask ImportEdgeStateAsync(Checkpoint checkpoint) + { + this.CheckEnded(); + + return this._edgeMap.ImportStateAsync(checkpoint); + } + internal async ValueTask RepublishUnservicedRequestsAsync(CancellationToken cancellationToken = default) { this.CheckEnded(); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointResumeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointResumeTests.cs index 53ea6447125..2fcea707ee0 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointResumeTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointResumeTests.cs @@ -321,6 +321,72 @@ internal async Task Checkpoint_Restore_ClearsQueuedExternalResponsesBeforeImport "the workflow should finish once the replayed request receives a fresh response"); } + /// + /// Verifies that fan-in edge state buffered before a checkpoint is still present after resume. + /// + [Theory] + [InlineData(ExecutionEnvironment.InProcess_OffThread)] + [InlineData(ExecutionEnvironment.InProcess_Lockstep)] + internal async Task Checkpoint_Resume_PreservesFanInBarrierBufferedMessagesAsync(ExecutionEnvironment environment) + { + // Arrange + const string RequestPortId = "Approval"; + const string SinkId = "Sink"; + + ExecutorBinding beforePause = new PreCheckpointBarrierSource("BeforePause", RequestPortId, SinkId); + ExecutorBinding afterResume = new PostCheckpointBarrierSource("AfterResume", SinkId); + ExecutorBinding sink = new BarrierSink(SinkId); + RequestPort requestPort = RequestPort.Create(RequestPortId); + + Workflow workflow = new WorkflowBuilder(beforePause) + .AddEdge(beforePause, requestPort) + .AddEdge(requestPort, afterResume) + .AddFanInBarrierEdge([beforePause, afterResume], sink) + .Build(); + + CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); + InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); + + ExternalRequest pendingRequest; + CheckpointInfo checkpoint; + + await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager) + .RunStreamingAsync(workflow, "start")) + { + (pendingRequest, checkpoint) = await CapturePendingRequestAndCheckpointAsync(firstRun); + } + + // Act + await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager) + .ResumeStreamingAsync(workflow, checkpoint); + + List resumedEvents = await ReadToHaltAsync(resumed); + ExternalRequest replayedRequest = resumedEvents.OfType() + .Select(evt => evt.Request) + .Should() + .ContainSingle("resume should replay the request captured after the first fan-in source") + .Subject; + + await resumed.SendResponseAsync(replayedRequest.CreateResponse(new ApprovalReply("yes"))); + + List completionEvents = await ReadToHaltAsync(resumed); + + // Assert + completionEvents.OfType().Should().BeEmpty( + "resuming across a partially satisfied fan-in barrier should not raise workflow errors"); + + string[] outputs = [.. completionEvents.OfType().Select(evt => evt.Source)]; + outputs.Should().BeEquivalentTo(["before", "after"], + "the barrier should release the contribution buffered before the checkpoint and the one produced after resume"); + + RunStatus status = await resumed.GetStatusAsync(); + status.Should().Be(RunStatus.Idle, + "the fan-in target should run after the post-resume source contributes"); + + pendingRequest.RequestId.Should().Be(replayedRequest.RequestId, + "the replayed request should be the one from the checkpointed superstep"); + } + /// /// Verifies that a resumed parent workflow re-emits pending requests that originated in a subworkflow. /// @@ -484,4 +550,48 @@ private static async ValueTask> ReadToHaltAsync(StreamingRun return events; } + + private sealed record BarrierContribution(string Source); + + private sealed record ApprovalRequest(string Prompt); + + private sealed record ApprovalReply(string Value); + + private sealed class BarrierReleasedEvent(string source) : WorkflowEvent + { + public string Source { get; } = source; + } + + private sealed class PreCheckpointBarrierSource(string id, string requestPortId, string sinkId) : Executor(id) + { + protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder) + => protocolBuilder.ConfigureRoutes(routeBuilder => routeBuilder.AddHandler(this.HandleAsync)) + .SendsMessage() + .SendsMessage(); + + private async ValueTask HandleAsync(string input, IWorkflowContext ctx) + { + await ctx.SendMessageAsync(new BarrierContribution("before"), sinkId).ConfigureAwait(false); + await ctx.SendMessageAsync(new ApprovalRequest("continue?"), requestPortId).ConfigureAwait(false); + } + } + + private sealed class PostCheckpointBarrierSource(string id, string sinkId) : Executor(id) + { + protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder) + => protocolBuilder.ConfigureRoutes(routeBuilder => routeBuilder.AddHandler(this.HandleAsync)) + .SendsMessage(); + + private ValueTask HandleAsync(ApprovalReply reply, IWorkflowContext ctx) + => ctx.SendMessageAsync(new BarrierContribution("after"), sinkId); + } + + private sealed class BarrierSink(string id) : Executor(id) + { + protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder) + => protocolBuilder.ConfigureRoutes(routeBuilder => routeBuilder.AddHandler(this.HandleAsync)); + + private ValueTask HandleAsync(BarrierContribution contribution, IWorkflowContext ctx) + => ctx.AddEventAsync(new BarrierReleasedEvent(contribution.Source)); + } }