Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ private InProcessRunner(Workflow workflow, ICheckpointManager? checkpointManager
this._knownValidInputTypes = knownValidInputTypes != null
? [.. knownValidInputTypes]
: [];

// Initialize the runners for each of the edges, along with the state for edges that need it.
this.EdgeMap = new EdgeMap(this.RunContext, this.Workflow.Edges, this.Workflow.Ports.Values, this.Workflow.StartExecutorId, this.StepTracer);
}

/// <inheritdoc cref="ISuperStepRunner.SessionId"/>
Expand Down Expand Up @@ -154,7 +151,6 @@ ValueTask ISuperStepRunner.EnqueueResponseAsync(ExternalResponse response, Cance
private Workflow Workflow { get; init; }
internal InProcessRunnerContext RunContext { get; init; }
private ICheckpointManager? CheckpointManager { get; }
private EdgeMap EdgeMap { get; init; }

public ConcurrentEventSink OutgoingEvents { get; } = new();

Expand Down Expand Up @@ -358,7 +354,7 @@ internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = d
// Create a representation of the current workflow if it does not already exist.
this._workflowInfoCache ??= this.Workflow.ToWorkflowInfo();

Dictionary<EdgeId, PortableValue> edgeData = await this.EdgeMap.ExportStateAsync().ConfigureAwait(false);
Dictionary<EdgeId, PortableValue> edgeData = await this.RunContext.ExportEdgeStateAsync().ConfigureAwait(false);

await prepareTask.ConfigureAwait(false);
await this.RunContext.StateManager.PublishUpdatesAsync(this.StepTracer).ConfigureAwait(false);
Expand Down Expand Up @@ -422,7 +418,7 @@ private async ValueTask RestoreCheckpointCoreAsync(CheckpointInfo checkpointInfo

Task executorNotifyTask = this.RunContext.NotifyCheckpointLoadedAsync(cancellationToken);

await this.EdgeMap.ImportStateAsync(checkpoint).ConfigureAwait(false);
await this.RunContext.ImportEdgeStateAsync(checkpoint).ConfigureAwait(false);
await Task.WhenAll(executorNotifyTask,
restoreCheckpointIndexTask.AsTask()).ConfigureAwait(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,20 @@ internal ValueTask<RunnerStateData> ExportStateAsync()
return new(result);
}

internal ValueTask<Dictionary<EdgeId, PortableValue>> ExportEdgeStateAsync()
{
this.CheckEnded();

return this._edgeMap.ExportStateAsync();
}

internal ValueTask ImportEdgeStateAsync(Checkpoint checkpoint)
{
this.CheckEnded();

return this._edgeMap.ImportStateAsync(checkpoint);
}
Comment on lines +414 to +426

internal async ValueTask RepublishUnservicedRequestsAsync(CancellationToken cancellationToken = default)
{
this.CheckEnded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,72 @@ internal async Task Checkpoint_Restore_ClearsQueuedExternalResponsesBeforeImport
"the workflow should finish once the replayed request receives a fresh response");
}

/// <summary>
/// Verifies that fan-in edge state buffered before a checkpoint is still present after resume.
/// </summary>
[Theory]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
internal async Task Checkpoint_Resume_PreservesFanInBarrierBufferedMessagesAsync(ExecutionEnvironment environment)
{
// Arrange
const string RequestPortId = "Approval";
const string SinkId = "Sink";

ExecutorBinding beforePause = new PreCheckpointBarrierSource("BeforePause", RequestPortId, SinkId);
ExecutorBinding afterResume = new PostCheckpointBarrierSource("AfterResume", SinkId);
ExecutorBinding sink = new BarrierSink(SinkId);
RequestPort<ApprovalRequest, ApprovalReply> requestPort = RequestPort.Create<ApprovalRequest, ApprovalReply>(RequestPortId);

Workflow workflow = new WorkflowBuilder(beforePause)
.AddEdge(beforePause, requestPort)
.AddEdge(requestPort, afterResume)
.AddFanInBarrierEdge([beforePause, afterResume], sink)
.Build();

CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();

ExternalRequest pendingRequest;
CheckpointInfo checkpoint;

await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager)
.RunStreamingAsync(workflow, "start"))
{
(pendingRequest, checkpoint) = await CapturePendingRequestAndCheckpointAsync(firstRun);
}

// Act
await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager)
.ResumeStreamingAsync(workflow, checkpoint);

List<WorkflowEvent> resumedEvents = await ReadToHaltAsync(resumed);
ExternalRequest replayedRequest = resumedEvents.OfType<RequestInfoEvent>()
.Select(evt => evt.Request)
.Should()
.ContainSingle("resume should replay the request captured after the first fan-in source")
.Subject;

await resumed.SendResponseAsync(replayedRequest.CreateResponse(new ApprovalReply("yes")));

List<WorkflowEvent> completionEvents = await ReadToHaltAsync(resumed);

// Assert
completionEvents.OfType<WorkflowErrorEvent>().Should().BeEmpty(
"resuming across a partially satisfied fan-in barrier should not raise workflow errors");

string[] outputs = [.. completionEvents.OfType<BarrierReleasedEvent>().Select(evt => evt.Source)];
outputs.Should().BeEquivalentTo(["before", "after"],
"the barrier should release the contribution buffered before the checkpoint and the one produced after resume");

RunStatus status = await resumed.GetStatusAsync();
status.Should().Be(RunStatus.Idle,
"the fan-in target should run after the post-resume source contributes");

pendingRequest.RequestId.Should().Be(replayedRequest.RequestId,
"the replayed request should be the one from the checkpointed superstep");
}

/// <summary>
/// Verifies that a resumed parent workflow re-emits pending requests that originated in a subworkflow.
/// </summary>
Expand Down Expand Up @@ -484,4 +550,48 @@ private static async ValueTask<List<WorkflowEvent>> ReadToHaltAsync(StreamingRun

return events;
}

private sealed record BarrierContribution(string Source);

private sealed record ApprovalRequest(string Prompt);

private sealed record ApprovalReply(string Value);

private sealed class BarrierReleasedEvent(string source) : WorkflowEvent
{
public string Source { get; } = source;
}

private sealed class PreCheckpointBarrierSource(string id, string requestPortId, string sinkId) : Executor(id)
{
protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder)
=> protocolBuilder.ConfigureRoutes(routeBuilder => routeBuilder.AddHandler<string>(this.HandleAsync))
.SendsMessage<BarrierContribution>()
.SendsMessage<ApprovalRequest>();

private async ValueTask HandleAsync(string input, IWorkflowContext ctx)
{
await ctx.SendMessageAsync(new BarrierContribution("before"), sinkId).ConfigureAwait(false);
await ctx.SendMessageAsync(new ApprovalRequest("continue?"), requestPortId).ConfigureAwait(false);
}
}

private sealed class PostCheckpointBarrierSource(string id, string sinkId) : Executor(id)
{
protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder)
=> protocolBuilder.ConfigureRoutes(routeBuilder => routeBuilder.AddHandler<ApprovalReply>(this.HandleAsync))
.SendsMessage<BarrierContribution>();

private ValueTask HandleAsync(ApprovalReply reply, IWorkflowContext ctx)
=> ctx.SendMessageAsync(new BarrierContribution("after"), sinkId);
Comment on lines +585 to +586
}

private sealed class BarrierSink(string id) : Executor(id)
{
protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder)
=> protocolBuilder.ConfigureRoutes(routeBuilder => routeBuilder.AddHandler<BarrierContribution>(this.HandleAsync));

private ValueTask HandleAsync(BarrierContribution contribution, IWorkflowContext ctx)
=> ctx.AddEventAsync(new BarrierReleasedEvent(contribution.Source));
}
}