.NET: [Feature Branch] Adding support for events & shared state in durable workflows#4020
.NET: [Feature Branch] Adding support for events & shared state in durable workflows#4020kshyju wants to merge 8 commits intofeat/durable_taskfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds durable-workflow event streaming and shared state support to the .NET DurableTask workflow runtime, enabling real-time observability (via StreamAsync/WatchStreamAsync) and cross-executor state without passing data through the message chain.
Changes:
- Introduces
IWorkflowClient.StreamAsync<TInput>andIStreamingWorkflowRunfor live event streaming. - Implements shared-state + event/halt plumbing through durable activity inputs/outputs and orchestration custom status.
- Adds two console samples (events + shared state) and integration tests validating end-to-end behavior.
Reviewed changes
Copilot reviewed 32 out of 32 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| dotnet/tests/Microsoft.Agents.AI.DurableTask.IntegrationTests/WorkflowConsoleAppSamplesValidation.cs | Adds integration validations for new samples |
| dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/BuiltInFunctions.cs | Uses shared HTTP error-response helper |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/TypedPayload.cs | Adds typed payload wrapper for events/messages |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/SentMessageInfo.cs | Removes old message wrapper type |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/IWorkflowClient.cs | Adds StreamAsync<TInput> API |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/IStreamingWorkflowRun.cs | New public streaming-run interface |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs | Orchestrator: shared state + event accumulation + halt |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRun.cs | Uses streaming result extraction logic |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowResult.cs | Output wrapper including events + result |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowJsonContext.cs | Updates STJ source-gen registrations |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowFailedEvent.cs | Public terminal failure event |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowCustomStatus.cs | Custom-status model for streaming |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowCompletedEvent.cs | Public terminal completion event |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowClient.cs | Implements StreamAsync scheduling |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableStreamingWorkflowRun.cs | Poll-based streaming implementation |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableSerialization.cs | Shared JSON options for runtime types |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableRunStatus.cs | Adds durable run status enum |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableHaltRequestedEvent.cs | Public halt-request event |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableExecutorDispatcher.cs | Passes shared state into activities |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableActivityOutput.cs | Activity output now includes events/state/halt |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableActivityExecutor.cs | Serializes events + state updates in output |
| dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableActivityContext.cs | Implements shared-state + event/halt APIs |
| dotnet/src/Microsoft.Agents.AI.DurableTask/ServiceCollectionExtensions.cs | Refactors binding classification helper |
| dotnet/samples/Durable/Workflow/ConsoleApps/07_WorkflowSharedState/README.md | Documents shared-state sample behavior |
| dotnet/samples/Durable/Workflow/ConsoleApps/07_WorkflowSharedState/Program.cs | Shared-state streaming sample entrypoint |
| dotnet/samples/Durable/Workflow/ConsoleApps/07_WorkflowSharedState/Executors.cs | Sample executors using shared state + halt |
| dotnet/samples/Durable/Workflow/ConsoleApps/07_WorkflowSharedState/07_WorkflowSharedState.csproj | Adds new shared-state sample project |
| dotnet/samples/Durable/Workflow/ConsoleApps/05_WorkflowEvents/README.md | Documents events + streaming usage |
| dotnet/samples/Durable/Workflow/ConsoleApps/05_WorkflowEvents/Program.cs | Events streaming sample entrypoint |
| dotnet/samples/Durable/Workflow/ConsoleApps/05_WorkflowEvents/Executors.cs | Sample executors emitting custom events |
| dotnet/samples/Durable/Workflow/ConsoleApps/05_WorkflowEvents/05_WorkflowEvents.csproj | Adds new workflow-events sample project |
| dotnet/agent-framework-dotnet.slnx | Includes new samples in solution |
dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableActivityContext.cs
Outdated
Show resolved
Hide resolved
dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowContext.cs
Show resolved
Hide resolved
dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableStreamingWorkflowRun.cs
Show resolved
Hide resolved
dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableStreamingWorkflowRun.cs
Outdated
Show resolved
Hide resolved
dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableStreamingWorkflowRun.cs
Outdated
Show resolved
Hide resolved
dotnet/src/Microsoft.Agents.AI.DurableTask/ServiceCollectionExtensions.cs
Show resolved
Hide resolved
dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableStreamingWorkflowRun.cs
Outdated
Show resolved
Hide resolved
dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableStreamingWorkflowRun.cs
Outdated
Show resolved
Hide resolved
dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs
Outdated
Show resolved
Hide resolved
...ests/Microsoft.Agents.AI.DurableTask.IntegrationTests/WorkflowConsoleAppSamplesValidation.cs
Show resolved
Hide resolved
The integration test asserts that WorkflowOutputEvent is found in the stream, but the sample executors only used AddEventAsync for custom events and never called YieldOutputAsync. Since WorkflowOutputEvent is only emitted via explicit YieldOutputAsync calls, the assertion would fail. Added YieldOutputAsync to each executor to match the test expectation and demonstrate the API in the sample. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
cgillum
left a comment
There was a problem hiding this comment.
I haven't gotten through the full PR yet, but here's some initial feedback/questions. Mostly very minor things so far.
...et/samples/Durable/Workflow/ConsoleApps/06_WorkflowSharedState/06_WorkflowSharedState.csproj
Show resolved
Hide resolved
| Console.WriteLine($" Read from shared state: shippingTier = {shippingTier}"); | ||
|
|
||
| // Write shipping details under a custom "shipping" scope. | ||
| // Scoped keys are isolated from the default namespace, so "carrier" here |
There was a problem hiding this comment.
Just my ignorance: What is a "default namespace" and what is a "scoped key"?
There was a problem hiding this comment.
That comment shouldn't be here. It was more of an implementation detail (how the keys are stored behind the scene when QueueStateUpdateAsync method is called). I updated the comments to remove it, so the comments are cleaner.
| Console.WriteLine($" Completed: {e.Result}"); | ||
| break; | ||
|
|
||
| case DurableWorkflowFailedEvent e: |
There was a problem hiding this comment.
Sorry if this is outside the scope of this PR, but IIRC there are some built-in MAF events for various lifecycle state transitions. Does MAF already have something for workflow completion/failed? I'm wondering why we need to have Durable-specific versions.
There was a problem hiding this comment.
Good question. MAF core currently doesn't have a WorkflowCompletedEvent at all, so DurableWorkflowCompletedEvent fills that gap. For the failure case, MAF has WorkflowErrorEvent but it carries an Exception? object. In the durable streaming context (via WatchStreamAsync), we only have the error message string from the orchestration status. There's no Exception instance available to construct a WorkflowErrorEvent. That's why DurableWorkflowFailedEvent carries string ErrorMessage instead.
For halt, MAF has RequestHaltEvent but it's internal, so it can't be yielded to external stream consumers.
We are using existing MAF events which are available to use in other areas of durable workflow (ex: WorkflowOutputEvent , ExecutorInvokedEvent and ExecutorCompletedEvent)
All three durable specific evernts I created extend WorkflowEvent from MAF.
I have a GH issue opened in the repo to expose these events as public. Once they are(before GA), we can switch to them and remove our copy. #4063
There was a problem hiding this comment.
Thanks for the explanation. One thought I have is that we can do better than just string ErrorMessage and provide TaskFailureDetails FailureDetails from the Durable Task SDK so that the user gets the error type, message, stack trace, and inner-failure details.
| Console.WriteLine(" Wrote to shared state: shipping:estimatedDays = 2"); | ||
|
|
||
| // Verify we can read the audit entry from the previous step | ||
| AuditEntry? previousAudit = await context.ReadStateAsync<AuditEntry>("audit:validate", cancellationToken: cancellationToken); |
There was a problem hiding this comment.
More ignorance on my part: I noticed that the update API has a separate scope and key value, but it seems the read API only has a single input type containing both the scope and the key. Is this a MAF convention? I'm wondering why we wouldn't have a more type-safe API for this.
There was a problem hiding this comment.
The read and update APIs actually have the same signature. Both take (string key, string? scopeName = null, ...):
I think my previous sample code was not the easiest to follow. The audit keys used : in the key name (e.g., "audit:validate"), which made it look like a combined scope+key format. I've cleaned this up now.
- Renamed audit keys to camelCase (auditValidate, auditEnrich, auditPayment)
- Simplified the scoped write to a single key (carrier with scopeName: "shipping")
- Added a scoped read in GenerateInvoice so the sample demonstrates the full write → read → clear lifecycle with scopes
| | **ProcessPayment** | payment ref string | `taxRate` | `audit:payment` | | ||
| | **GenerateInvoice** | invoice string | `audit:validate`, `audit:enrich`, `audit:payment` | clears `shipping` scope | | ||
|
|
||
| > **Note:** `EnrichOrder` writes `carrier` and `estimatedDays` under the `"shipping"` scope using `scopeName: "shipping"`. Scoped keys are isolated from the default namespace, so a key like `"carrier"` in the `"shipping"` scope won't collide with a `"carrier"` key in the default scope. |
There was a problem hiding this comment.
nit: instead of **Note:** we should use the standard callout syntax - e.g.
> [!NOTE]
> blah blah blahI usually have to instruct Copilot about this since it prefers the ad-hoc syntax that you have here.
| public ValueTask RequestHaltAsync() | ||
| { | ||
| this.HaltRequested = true; | ||
| this.Events.Add(new DurableHaltRequestedEvent(this._executor.Id)); |
There was a problem hiding this comment.
Hmm...so we have to emit our own lifecycle events for cases like the workflow halting?
There was a problem hiding this comment.
There is a gap today. I have a GH issue opened in the repo to expose these events as public. Once they are(before GA), we can switch to them and remove our copy. #4063
|
|
||
| // Remove any pending updates in this scope (snapshot keys to allow removal during iteration) | ||
| string scopePrefix = GetScopePrefix(scopeName); | ||
| foreach (string key in this.StateUpdates.Keys.ToArray()) |
There was a problem hiding this comment.
nit: I usually go with ToList() instead of ToArray() because ToArray() has a slower implementation last I checked.
There was a problem hiding this comment.
Oh okay. Switched to that. It might be interesting to benchmark in the latest .net versions though.
| /// Gets or sets the state updates (scope-prefixed key to value; null indicates deletion). | ||
| /// </summary> | ||
| public List<SentMessageInfo> SentMessages { get; set; } = []; | ||
| public Dictionary<string, string?> StateUpdates { get; set; } = []; |
There was a problem hiding this comment.
Do all these collection properties need to be settable?
There was a problem hiding this comment.
Changed all properties from set to init. They only need to be settable during construction/deserialization, not afterward.
| public enum DurableRunStatus | ||
| { | ||
| /// <summary> | ||
| /// The orchestration instance was not found. |
There was a problem hiding this comment.
It looks like this is a public API, so should we be using "workflow" instead of "orchestration" in these descriptions?
There was a problem hiding this comment.
Sounds good. Updated the description.
| /// <summary> | ||
| /// Represents the execution status of a durable workflow run. | ||
| /// </summary> | ||
| public enum DurableRunStatus |
There was a problem hiding this comment.
I'm also curious why we need to define this ourselves vs. use some workflow status enum defined in MAF?
There was a problem hiding this comment.
MAF's RunStatus tracks in-process run lifecycle (NotStarted, Idle, PendingRequests, Ended, Running)
The
DurableRunStatus I added maps durable orchestration states (Completed, Failed, Terminated, Suspended, NotFound) which have no equivalent in MAF's RunStatus today. Only Running overlaps. Longer term, it might make sense to unify these at the MAF layer, but the current states don't map cleanly.
There was a problem hiding this comment.
Makes sense. I wonder if we could/should expose both RunStatus and DurableRunStatus to the user so that existing MAF tooling can consume the MAF status and the durable status can be used to get more information?
cgillum
left a comment
There was a problem hiding this comment.
Got through the full PR! Added a few more comments.
| /// Output payload from activity execution, containing the result and other metadata. | ||
| /// Output payload from activity execution, containing the result, state updates, and emitted events. | ||
| /// </summary> | ||
| internal sealed class DurableActivityOutput |
There was a problem hiding this comment.
Maybe call this a DurableExecutorOutput?
There was a problem hiding this comment.
Makes sense. Renamed to DurableExecutorOutput.
| /// <summary> | ||
| /// Represents the execution status of a durable workflow run. | ||
| /// </summary> | ||
| public enum DurableRunStatus |
There was a problem hiding this comment.
Makes sense. I wonder if we could/should expose both RunStatus and DurableRunStatus to the user so that existing MAF tooling can consume the MAF status and the durable status can be used to get more information?
| if (metadata.SerializedCustomStatus is not null) | ||
| { | ||
| DurableWorkflowCustomStatus? customStatus = TryParseCustomStatus(metadata.SerializedCustomStatus); | ||
| if (customStatus is not null) |
There was a problem hiding this comment.
nit: it's more conventional to have a TryXXX method return a bool and put the result as an out parameter. That would also allow you to combine these two lines into one and remove the null check.
| } | ||
| else | ||
| { | ||
| yield return new DurableWorkflowCompletedEvent(metadata.SerializedOutput); |
There was a problem hiding this comment.
Is this ever expected? I think we should add a comment here about when this situation might occur and how we expect users to consume the raw serialized output.
There was a problem hiding this comment.
I added this as a defensive fallback. Ideally this line of code should not be executed as we expect the runner to always wraps output in DurableWorkflowResult. I changed this to return a more strict response , DurableWorkflowFailedEvent now with the failure details.
| { | ||
| // The framework clears custom status on completion, so events may be in | ||
| // SerializedOutput as a DurableWorkflowResult wrapper. | ||
| DurableWorkflowResult? outputResult = TryParseWorkflowResult(metadata.SerializedOutput); |
There was a problem hiding this comment.
nit: same comment about TryXXX methods.
| if (metadata.RuntimeStatus == OrchestrationRuntimeStatus.Failed) | ||
| { | ||
| string errorMessage = metadata.FailureDetails?.ErrorMessage ?? "Workflow execution failed."; | ||
| yield return new DurableWorkflowFailedEvent(errorMessage); |
There was a problem hiding this comment.
Related to an earlier comment - ideally, we'd send back the full metadata.FailureDetails object.
There was a problem hiding this comment.
Yes, updated to include FailureDetails.
| if (metadata.RuntimeStatus == OrchestrationRuntimeStatus.Failed) | ||
| { | ||
| string errorMessage = metadata.FailureDetails?.ErrorMessage ?? "Workflow execution failed."; | ||
| throw new InvalidOperationException(errorMessage); |
There was a problem hiding this comment.
It might be more helpful to throw a TaskFailedException here and pass the metadata.FailureDetails is a constructor argument.
There was a problem hiding this comment.
Switched to TaskFailedException with FailureDetails
| /// This interface defines the contract for streaming workflow runs in durable execution | ||
| /// environments. Implementations provide real-time access to workflow events. | ||
| /// </remarks> | ||
| public interface IStreamingWorkflowRun |
There was a problem hiding this comment.
I forget if I asked this already in the previous review, but why do we need our own interface types? Do the MAF ones not work for us?
There was a problem hiding this comment.
MAF's StreamingRun is a concrete class tied to in process execution.It holds an AsyncRunHandle and supports SendResponseAsync, TrySendMessageAsync, and CancelRunAsync, none of which apply to the durable (polling-based) execution model. The durable streaming run polls orchestration metadata externally and has its own lifecycle (e.g., GetStatusAsync returning DurableRunStatus, WaitForCompletionAsync). We defined IStreamingWorkflowRun as a minimal interface (RunId + WatchStreamAsync) that captures just the common streaming contract. If MAF core library introduces an abstraction over both in-process and durable streaming, we can switch to that.
| // Assert | ||
| Assert.Equal(2, events.Count); | ||
| Assert.IsType<DurableHaltRequestedEvent>(events[0]); | ||
| Assert.IsType<DurableWorkflowCompletedEvent>(events[1]); |
There was a problem hiding this comment.
Should we also evaluate the payloads for these events, or is that covered elsewhere?
There was a problem hiding this comment.
Yea, updated tests to assert that as well.
Motivation and Context
This PR adds workflow events and shared state support for durable workflows, building on the foundational workflow APIs introduced in #3648 and the Azure Functions hosting support added in #3935. While previous PRs established the core execution model, this PR enables real-time observability and cross-executor data sharing — two capabilities essential for production workflow scenarios.
Key scenarios enabled:
Description
This PR introduces event streaming and shared state infrastructure for durable workflows, along with new public APIs and supporting types.
New Public APIs
IWorkflowClient.StreamAsync<TInput>IStreamingWorkflowRunfor real-time event observation.IStreamingWorkflowRunRunIdandWatchStreamAsync()that yieldsWorkflowEventinstances as they occur.DurableWorkflowCompletedEventDurableWorkflowFailedEventDurableHaltRequestedEventRequestHaltAsync).Shared State APIs (via
IWorkflowContext)Shared state is implemented through the existing
IWorkflowContextinterface, which executors already receive inHandleAsync. No new public types are needed — the following methods are now functional in the durable context:QueueStateUpdateAsyncReadStateAsync<T>ReadOrInitStateAsync<T>QueueClearScopeAsyncRequestHaltAsyncAddEventAsyncYieldOutputAsyncWorkflowOutputEventInternal Infrastructure Changes
DurableActivityContextIWorkflowContext— state read/write, events, halt requests, and scoped state managementDurableWorkflowRunnerDurableStreamingWorkflowRunDurableActivityExecutorDurableExecutorDispatcherDurableWorkflowJsonContextHow Event Streaming Works
The orchestration accumulates events from activity outputs and publishes them via
SetCustomStatusafter each superstep.DurableStreamingWorkflowRunpolls the orchestration metadata at a configurable interval, deserializes new events, and yields them throughWatchStreamAsync. On completion, events are also stored in the orchestration output as a fallback (sinceSerializedCustomStatusis cleared by the framework on completion).How Shared State Works
Shared state is a
Dictionary<string, string>maintained by the orchestration. Before each activity dispatch, the current state snapshot is passed to the activity. Activities collect state updates locally and return them as part of their output. The orchestration merges updates after each superstep, making them available to the next round of executors. State keys can be partitioned into scopes (e.g.,scopeName: "shipping") so that different parts of a workflow can manage their state independently.Validation/Testing
Samples Added:
05_WorkflowEventsAddEventAsync) and real-time streaming (StreamAsync/WatchStreamAsync) with an order cancellation workflow. No Azure OpenAI required.07_WorkflowSharedStateQueueStateUpdateAsync,ReadStateAsync,ReadOrInitStateAsync, scoped writes, andQueueClearScopeAsync. No Azure OpenAI required.Integration Tests: Integration tests have been added (
WorkflowConsoleAppSamplesValidation.cs) to validate both samples —WorkflowEventsSampleValidationAsyncandWorkflowSharedStateSampleValidationAsync— ensuring end-to-end functionality works as expected.Both samples have been manually verified against the local DTS emulator.
Contribution Checklist