From aa0ec893ea20d59670f594d6d5721800bd1cce09 Mon Sep 17 00:00:00 2001
From: Yufeng He <40085740+he-yufeng@users.noreply.github.com>
Date: Fri, 12 Jun 2026 13:53:04 +0800
Subject: [PATCH] .NET: fix fan-in checkpoint edge state
---
.../InProc/InProcessRunner.cs | 8 +-
.../InProc/InProcessRunnerContext.cs | 14 +++
.../CheckpointResumeTests.cs | 110 ++++++++++++++++++
3 files changed, 126 insertions(+), 6 deletions(-)
diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs
index d3f229a7da9..7fd10b568e2 100644
--- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs
+++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs
@@ -60,9 +60,6 @@ private InProcessRunner(Workflow workflow, ICheckpointManager? checkpointManager
this._knownValidInputTypes = knownValidInputTypes != null
? [.. knownValidInputTypes]
: [];
-
- // Initialize the runners for each of the edges, along with the state for edges that need it.
- this.EdgeMap = new EdgeMap(this.RunContext, this.Workflow.Edges, this.Workflow.Ports.Values, this.Workflow.StartExecutorId, this.StepTracer);
}
///
@@ -154,7 +151,6 @@ ValueTask ISuperStepRunner.EnqueueResponseAsync(ExternalResponse response, Cance
private Workflow Workflow { get; init; }
internal InProcessRunnerContext RunContext { get; init; }
private ICheckpointManager? CheckpointManager { get; }
- private EdgeMap EdgeMap { get; init; }
public ConcurrentEventSink OutgoingEvents { get; } = new();
@@ -358,7 +354,7 @@ internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = d
// Create a representation of the current workflow if it does not already exist.
this._workflowInfoCache ??= this.Workflow.ToWorkflowInfo();
- Dictionary edgeData = await this.EdgeMap.ExportStateAsync().ConfigureAwait(false);
+ Dictionary edgeData = await this.RunContext.ExportEdgeStateAsync().ConfigureAwait(false);
await prepareTask.ConfigureAwait(false);
await this.RunContext.StateManager.PublishUpdatesAsync(this.StepTracer).ConfigureAwait(false);
@@ -422,7 +418,7 @@ private async ValueTask RestoreCheckpointCoreAsync(CheckpointInfo checkpointInfo
Task executorNotifyTask = this.RunContext.NotifyCheckpointLoadedAsync(cancellationToken);
- await this.EdgeMap.ImportStateAsync(checkpoint).ConfigureAwait(false);
+ await this.RunContext.ImportEdgeStateAsync(checkpoint).ConfigureAwait(false);
await Task.WhenAll(executorNotifyTask,
restoreCheckpointIndexTask.AsTask()).ConfigureAwait(false);
diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs
index 8201f8d8fe2..ba071ec712a 100644
--- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs
+++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs
@@ -411,6 +411,20 @@ internal ValueTask ExportStateAsync()
return new(result);
}
+ internal ValueTask> ExportEdgeStateAsync()
+ {
+ this.CheckEnded();
+
+ return this._edgeMap.ExportStateAsync();
+ }
+
+ internal ValueTask ImportEdgeStateAsync(Checkpoint checkpoint)
+ {
+ this.CheckEnded();
+
+ return this._edgeMap.ImportStateAsync(checkpoint);
+ }
+
internal async ValueTask RepublishUnservicedRequestsAsync(CancellationToken cancellationToken = default)
{
this.CheckEnded();
diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointResumeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointResumeTests.cs
index 53ea6447125..2fcea707ee0 100644
--- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointResumeTests.cs
+++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointResumeTests.cs
@@ -321,6 +321,72 @@ internal async Task Checkpoint_Restore_ClearsQueuedExternalResponsesBeforeImport
"the workflow should finish once the replayed request receives a fresh response");
}
+ ///
+ /// Verifies that fan-in edge state buffered before a checkpoint is still present after resume.
+ ///
+ [Theory]
+ [InlineData(ExecutionEnvironment.InProcess_OffThread)]
+ [InlineData(ExecutionEnvironment.InProcess_Lockstep)]
+ internal async Task Checkpoint_Resume_PreservesFanInBarrierBufferedMessagesAsync(ExecutionEnvironment environment)
+ {
+ // Arrange
+ const string RequestPortId = "Approval";
+ const string SinkId = "Sink";
+
+ ExecutorBinding beforePause = new PreCheckpointBarrierSource("BeforePause", RequestPortId, SinkId);
+ ExecutorBinding afterResume = new PostCheckpointBarrierSource("AfterResume", SinkId);
+ ExecutorBinding sink = new BarrierSink(SinkId);
+ RequestPort requestPort = RequestPort.Create(RequestPortId);
+
+ Workflow workflow = new WorkflowBuilder(beforePause)
+ .AddEdge(beforePause, requestPort)
+ .AddEdge(requestPort, afterResume)
+ .AddFanInBarrierEdge([beforePause, afterResume], sink)
+ .Build();
+
+ CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
+ InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
+
+ ExternalRequest pendingRequest;
+ CheckpointInfo checkpoint;
+
+ await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager)
+ .RunStreamingAsync(workflow, "start"))
+ {
+ (pendingRequest, checkpoint) = await CapturePendingRequestAndCheckpointAsync(firstRun);
+ }
+
+ // Act
+ await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager)
+ .ResumeStreamingAsync(workflow, checkpoint);
+
+ List resumedEvents = await ReadToHaltAsync(resumed);
+ ExternalRequest replayedRequest = resumedEvents.OfType()
+ .Select(evt => evt.Request)
+ .Should()
+ .ContainSingle("resume should replay the request captured after the first fan-in source")
+ .Subject;
+
+ await resumed.SendResponseAsync(replayedRequest.CreateResponse(new ApprovalReply("yes")));
+
+ List completionEvents = await ReadToHaltAsync(resumed);
+
+ // Assert
+ completionEvents.OfType().Should().BeEmpty(
+ "resuming across a partially satisfied fan-in barrier should not raise workflow errors");
+
+ string[] outputs = [.. completionEvents.OfType().Select(evt => evt.Source)];
+ outputs.Should().BeEquivalentTo(["before", "after"],
+ "the barrier should release the contribution buffered before the checkpoint and the one produced after resume");
+
+ RunStatus status = await resumed.GetStatusAsync();
+ status.Should().Be(RunStatus.Idle,
+ "the fan-in target should run after the post-resume source contributes");
+
+ pendingRequest.RequestId.Should().Be(replayedRequest.RequestId,
+ "the replayed request should be the one from the checkpointed superstep");
+ }
+
///
/// Verifies that a resumed parent workflow re-emits pending requests that originated in a subworkflow.
///
@@ -484,4 +550,48 @@ private static async ValueTask> ReadToHaltAsync(StreamingRun
return events;
}
+
+ private sealed record BarrierContribution(string Source);
+
+ private sealed record ApprovalRequest(string Prompt);
+
+ private sealed record ApprovalReply(string Value);
+
+ private sealed class BarrierReleasedEvent(string source) : WorkflowEvent
+ {
+ public string Source { get; } = source;
+ }
+
+ private sealed class PreCheckpointBarrierSource(string id, string requestPortId, string sinkId) : Executor(id)
+ {
+ protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder)
+ => protocolBuilder.ConfigureRoutes(routeBuilder => routeBuilder.AddHandler(this.HandleAsync))
+ .SendsMessage()
+ .SendsMessage();
+
+ private async ValueTask HandleAsync(string input, IWorkflowContext ctx)
+ {
+ await ctx.SendMessageAsync(new BarrierContribution("before"), sinkId).ConfigureAwait(false);
+ await ctx.SendMessageAsync(new ApprovalRequest("continue?"), requestPortId).ConfigureAwait(false);
+ }
+ }
+
+ private sealed class PostCheckpointBarrierSource(string id, string sinkId) : Executor(id)
+ {
+ protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder)
+ => protocolBuilder.ConfigureRoutes(routeBuilder => routeBuilder.AddHandler(this.HandleAsync))
+ .SendsMessage();
+
+ private ValueTask HandleAsync(ApprovalReply reply, IWorkflowContext ctx)
+ => ctx.SendMessageAsync(new BarrierContribution("after"), sinkId);
+ }
+
+ private sealed class BarrierSink(string id) : Executor(id)
+ {
+ protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder)
+ => protocolBuilder.ConfigureRoutes(routeBuilder => routeBuilder.AddHandler(this.HandleAsync));
+
+ private ValueTask HandleAsync(BarrierContribution contribution, IWorkflowContext ctx)
+ => ctx.AddEventAsync(new BarrierReleasedEvent(contribution.Source));
+ }
}