diff --git a/dotnet/agent-framework-dotnet.slnx b/dotnet/agent-framework-dotnet.slnx index 19547875b3..2c5ea815c5 100644 --- a/dotnet/agent-framework-dotnet.slnx +++ b/dotnet/agent-framework-dotnet.slnx @@ -52,6 +52,8 @@ + + diff --git a/dotnet/samples/Durable/Workflow/ConsoleApps/05_WorkflowEvents/05_WorkflowEvents.csproj b/dotnet/samples/Durable/Workflow/ConsoleApps/05_WorkflowEvents/05_WorkflowEvents.csproj new file mode 100644 index 0000000000..09e20ef622 --- /dev/null +++ b/dotnet/samples/Durable/Workflow/ConsoleApps/05_WorkflowEvents/05_WorkflowEvents.csproj @@ -0,0 +1,28 @@ + + + net10.0 + Exe + enable + enable + WorkflowEvents + WorkflowEvents + + + + + + + + + + + + + + + diff --git a/dotnet/samples/Durable/Workflow/ConsoleApps/05_WorkflowEvents/Executors.cs b/dotnet/samples/Durable/Workflow/ConsoleApps/05_WorkflowEvents/Executors.cs new file mode 100644 index 0000000000..47880f0fff --- /dev/null +++ b/dotnet/samples/Durable/Workflow/ConsoleApps/05_WorkflowEvents/Executors.cs @@ -0,0 +1,129 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Microsoft.Agents.AI.Workflows; + +namespace WorkflowEvents; + +// ═══════════════════════════════════════════════════════════════════════════════ +// Custom event types - callers observe these via WatchStreamAsync +// ═══════════════════════════════════════════════════════════════════════════════ + +internal sealed class OrderLookupStartedEvent(string orderId) : WorkflowEvent(orderId) +{ + public string OrderId { get; } = orderId; +} + +internal sealed class OrderFoundEvent(string customerName) : WorkflowEvent(customerName) +{ + public string CustomerName { get; } = customerName; +} + +internal sealed class CancellationProgressEvent(int percentComplete, string status) : WorkflowEvent(status) +{ + public int PercentComplete { get; } = percentComplete; + public string Status { get; } = status; +} + +internal sealed class OrderCancelledEvent() : WorkflowEvent("Order cancelled"); + +internal sealed class EmailSentEvent(string email) : WorkflowEvent(email) +{ + public string Email { get; } = email; +} + +// ═══════════════════════════════════════════════════════════════════════════════ +// Domain models +// ═══════════════════════════════════════════════════════════════════════════════ + +internal sealed record Order(string Id, DateTime OrderDate, bool IsCancelled, string? CancelReason, Customer Customer); + +internal sealed record Customer(string Name, string Email); + +// ═══════════════════════════════════════════════════════════════════════════════ +// Executors - emit events via AddEventAsync and YieldOutputAsync +// ═══════════════════════════════════════════════════════════════════════════════ + +/// +/// Looks up an order by ID, emitting progress events. +/// +internal sealed class OrderLookup() : Executor("OrderLookup") +{ + public override async ValueTask HandleAsync( + string message, + IWorkflowContext context, + CancellationToken cancellationToken = default) + { + await context.AddEventAsync(new OrderLookupStartedEvent(message), cancellationToken); + + // Simulate database lookup + await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken); + + Order order = new( + Id: message, + OrderDate: DateTime.UtcNow.AddDays(-1), + IsCancelled: false, + CancelReason: "Customer requested cancellation", + Customer: new Customer(Name: "Jerry", Email: "jerry@example.com")); + + await context.AddEventAsync(new OrderFoundEvent(order.Customer.Name), cancellationToken); + + // YieldOutputAsync emits a WorkflowOutputEvent observable via streaming + await context.YieldOutputAsync(order, cancellationToken); + + return order; + } +} + +/// +/// Cancels an order, emitting progress events during the multi-step process. +/// +internal sealed class OrderCancel() : Executor("OrderCancel") +{ + public override async ValueTask HandleAsync( + Order message, + IWorkflowContext context, + CancellationToken cancellationToken = default) + { + await context.AddEventAsync(new CancellationProgressEvent(0, "Starting cancellation"), cancellationToken); + + // Simulate a multi-step cancellation process + await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken); + await context.AddEventAsync(new CancellationProgressEvent(33, "Contacting payment provider"), cancellationToken); + + await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken); + await context.AddEventAsync(new CancellationProgressEvent(66, "Processing refund"), cancellationToken); + + await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken); + + Order cancelledOrder = message with { IsCancelled = true }; + await context.AddEventAsync(new CancellationProgressEvent(100, "Complete"), cancellationToken); + await context.AddEventAsync(new OrderCancelledEvent(), cancellationToken); + + await context.YieldOutputAsync(cancelledOrder, cancellationToken); + + return cancelledOrder; + } +} + +/// +/// Sends a cancellation confirmation email, emitting an event on completion. +/// +internal sealed class SendEmail() : Executor("SendEmail") +{ + public override async ValueTask HandleAsync( + Order message, + IWorkflowContext context, + CancellationToken cancellationToken = default) + { + // Simulate sending email + await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken); + + string result = $"Cancellation email sent for order {message.Id} to {message.Customer.Email}."; + + await context.AddEventAsync(new EmailSentEvent(message.Customer.Email), cancellationToken); + + await context.YieldOutputAsync(result, cancellationToken); + + return result; + } +} diff --git a/dotnet/samples/Durable/Workflow/ConsoleApps/05_WorkflowEvents/Program.cs b/dotnet/samples/Durable/Workflow/ConsoleApps/05_WorkflowEvents/Program.cs new file mode 100644 index 0000000000..7ca3ae77c1 --- /dev/null +++ b/dotnet/samples/Durable/Workflow/ConsoleApps/05_WorkflowEvents/Program.cs @@ -0,0 +1,138 @@ +// Copyright (c) Microsoft. All rights reserved. + +// ═══════════════════════════════════════════════════════════════════════════════ +// SAMPLE: Workflow Events and Streaming +// ═══════════════════════════════════════════════════════════════════════════════ +// +// This sample demonstrates how to use IWorkflowContext event methods in executors +// and stream events from the caller side: +// +// 1. AddEventAsync - Emit custom events that callers can observe in real-time +// 2. StreamAsync - Start a workflow and obtain a streaming handle +// 3. WatchStreamAsync - Observe events as they occur (custom, framework, and terminal) +// +// The sample uses IWorkflowClient.StreamAsync to start a workflow and +// WatchStreamAsync to observe events as they occur in real-time. +// +// Workflow: OrderLookup -> OrderCancel -> SendEmail +// ═══════════════════════════════════════════════════════════════════════════════ + +using Microsoft.Agents.AI.DurableTask; +using Microsoft.Agents.AI.DurableTask.Workflows; +using Microsoft.Agents.AI.Workflows; +using Microsoft.DurableTask.Client.AzureManaged; +using Microsoft.DurableTask.Worker.AzureManaged; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using WorkflowEvents; + +// Get DTS connection string from environment variable +string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING") + ?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"; + +// Define executors and build workflow +OrderLookup orderLookup = new(); +OrderCancel orderCancel = new(); +SendEmail sendEmail = new(); + +Workflow cancelOrder = new WorkflowBuilder(orderLookup) + .WithName("CancelOrder") + .WithDescription("Cancel an order and notify the customer") + .AddEdge(orderLookup, orderCancel) + .AddEdge(orderCancel, sendEmail) + .Build(); + +// Configure host with durable workflow support +IHost host = Host.CreateDefaultBuilder(args) + .ConfigureLogging(logging => logging.SetMinimumLevel(LogLevel.Warning)) + .ConfigureServices(services => + { + services.ConfigureDurableWorkflows( + workflowOptions => workflowOptions.AddWorkflow(cancelOrder), + workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString), + clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString)); + }) + .Build(); + +await host.StartAsync(); + +IWorkflowClient workflowClient = host.Services.GetRequiredService(); + +Console.WriteLine("Workflow Events Demo - Enter order ID (or 'exit'):"); + +while (true) +{ + Console.Write("> "); + string? input = Console.ReadLine(); + if (string.IsNullOrWhiteSpace(input) || input.Equals("exit", StringComparison.OrdinalIgnoreCase)) + { + break; + } + + try + { + await RunWorkflowWithStreamingAsync(input, cancelOrder, workflowClient); + } + catch (Exception ex) + { + Console.WriteLine($"Error: {ex.Message}"); + } + + Console.WriteLine(); +} + +await host.StopAsync(); + +// Runs a workflow and streams events as they occur +static async Task RunWorkflowWithStreamingAsync(string orderId, Workflow workflow, IWorkflowClient client) +{ + // StreamAsync starts the workflow and returns a streaming handle for observing events + IStreamingWorkflowRun run = await client.StreamAsync(workflow, orderId); + Console.WriteLine($"Started run: {run.RunId}"); + + // WatchStreamAsync yields events as they're emitted by executors + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + Console.WriteLine($" New event received at {DateTime.Now:HH:mm:ss.ffff} ({evt.GetType().Name})"); + + switch (evt) + { + // Custom domain events (emitted via AddEventAsync) + case OrderLookupStartedEvent e: + WriteColored($" [Lookup] Looking up order {e.OrderId}", ConsoleColor.Cyan); + break; + case OrderFoundEvent e: + WriteColored($" [Lookup] Found: {e.CustomerName}", ConsoleColor.Cyan); + break; + case CancellationProgressEvent e: + WriteColored($" [Cancel] {e.PercentComplete}% - {e.Status}", ConsoleColor.Yellow); + break; + case OrderCancelledEvent: + WriteColored(" [Cancel] Done", ConsoleColor.Yellow); + break; + case EmailSentEvent e: + WriteColored($" [Email] Sent to {e.Email}", ConsoleColor.Magenta); + break; + + case WorkflowOutputEvent e: + WriteColored($" [Output] {e.SourceId}", ConsoleColor.DarkGray); + break; + + // Workflow completion + case DurableWorkflowCompletedEvent e: + WriteColored($" Completed: {e.Result}", ConsoleColor.Green); + break; + case DurableWorkflowFailedEvent e: + WriteColored($" Failed: {e.ErrorMessage}", ConsoleColor.Red); + break; + } + } +} + +static void WriteColored(string message, ConsoleColor color) +{ + Console.ForegroundColor = color; + Console.WriteLine(message); + Console.ResetColor(); +} diff --git a/dotnet/samples/Durable/Workflow/ConsoleApps/05_WorkflowEvents/README.md b/dotnet/samples/Durable/Workflow/ConsoleApps/05_WorkflowEvents/README.md new file mode 100644 index 0000000000..00012c5afb --- /dev/null +++ b/dotnet/samples/Durable/Workflow/ConsoleApps/05_WorkflowEvents/README.md @@ -0,0 +1,127 @@ +# Workflow Events Sample + +This sample demonstrates how to use workflow events and streaming in durable workflows. + +## What it demonstrates + +1. **Custom Events** (`AddEventAsync`) — Executors emit domain-specific events during execution +2. **Event Streaming** (`StreamAsync` / `WatchStreamAsync`) — Callers observe events in real-time as the workflow progresses +3. **Framework Events** — Automatic `ExecutorInvokedEvent`, `ExecutorCompletedEvent`, and `WorkflowOutputEvent` events emitted by the framework + +## Emitting Custom Events + +Executors can emit custom domain events during execution using the `IWorkflowContext` instance passed to `HandleAsync`. These events are streamed to callers in real-time via `WatchStreamAsync`. + +### Defining a custom event + +Create a class that inherits from `WorkflowEvent`. Pass any data payload to the base constructor: + +```csharp +public class CancellationProgressEvent(int percentComplete, string status) : WorkflowEvent(status) +{ + public int PercentComplete { get; } = percentComplete; + public string Status { get; } = status; +} +``` + +### Emitting the event from an executor + +Call `AddEventAsync` on the `IWorkflowContext` inside your executor's `HandleAsync` method: + +```csharp +public override async ValueTask HandleAsync( + Order message, + IWorkflowContext context, + CancellationToken cancellationToken = default) +{ + await context.AddEventAsync(new CancellationProgressEvent(33, "Processing refund"), cancellationToken); + // ... rest of the executor logic +} +``` + +### Observing events from the caller + +Use `StreamAsync` to start the workflow and `WatchStreamAsync` to observe events. Pattern match on your custom event types: + +```csharp +IStreamingWorkflowRun run = await workflowClient.StreamAsync(workflow, input); + +await foreach (WorkflowEvent evt in run.WatchStreamAsync()) +{ + switch (evt) + { + case CancellationProgressEvent e: + Console.WriteLine($"{e.PercentComplete}% - {e.Status}"); + break; + } +} +``` + +## Workflow Structure + +``` +OrderLookup → OrderCancel → SendEmail +``` + +Each executor emits custom events during execution: +- `OrderLookup` emits `OrderLookupStartedEvent` and `OrderFoundEvent` +- `OrderCancel` emits `CancellationProgressEvent` (with percentage) and `OrderCancelledEvent` +- `SendEmail` emits `EmailSentEvent` + +## Prerequisites + +- [Durable Task Scheduler](https://learn.microsoft.com/azure/azure-functions/durable/durable-task-scheduler) running locally or in Azure +- Set the `DURABLE_TASK_SCHEDULER_CONNECTION_STRING` environment variable (defaults to local emulator) + +## Environment Setup + +See the [README.md](../../README.md) file in the parent directory for more information on how to configure the environment, including how to install and run common sample dependencies. + +## Running the sample + +```bash +dotnet run +``` + +Enter an order ID at the prompt to start a workflow and watch events stream in real-time: + +```text +> order-42 +Started run: b6ba4d19... + New event received at 13:27:41.4956 (ExecutorInvokedEvent) + New event received at 13:27:41.5019 (OrderLookupStartedEvent) + [Lookup] Looking up order order-42 + New event received at 13:27:41.5025 (OrderFoundEvent) + [Lookup] Found: Jerry + New event received at 13:27:41.5026 (ExecutorCompletedEvent) + New event received at 13:27:41.5026 (WorkflowOutputEvent) + [Output] OrderLookup + New event received at 13:27:43.0772 (ExecutorInvokedEvent) + New event received at 13:27:43.0773 (CancellationProgressEvent) + [Cancel] 0% - Starting cancellation + New event received at 13:27:43.0775 (CancellationProgressEvent) + [Cancel] 33% - Contacting payment provider + New event received at 13:27:43.0776 (CancellationProgressEvent) + [Cancel] 66% - Processing refund + New event received at 13:27:43.0777 (CancellationProgressEvent) + [Cancel] 100% - Complete + New event received at 13:27:43.0779 (OrderCancelledEvent) + [Cancel] Done + New event received at 13:27:43.0780 (ExecutorCompletedEvent) + New event received at 13:27:43.0780 (WorkflowOutputEvent) + [Output] OrderCancel + New event received at 13:27:43.6610 (ExecutorInvokedEvent) + New event received at 13:27:43.6611 (EmailSentEvent) + [Email] Sent to jerry@example.com + New event received at 13:27:43.6613 (ExecutorCompletedEvent) + New event received at 13:27:43.6613 (WorkflowOutputEvent) + [Output] SendEmail + New event received at 13:27:43.6619 (DurableWorkflowCompletedEvent) + Completed: Cancellation email sent for order order-42 to jerry@example.com. +``` + +### Viewing Workflows in the DTS Dashboard + +After running a workflow, you can navigate to the Durable Task Scheduler (DTS) dashboard to inspect the workflow execution and events. + +If you are using the DTS emulator, the dashboard is available at `http://localhost:8082`. diff --git a/dotnet/samples/Durable/Workflow/ConsoleApps/06_WorkflowSharedState/06_WorkflowSharedState.csproj b/dotnet/samples/Durable/Workflow/ConsoleApps/06_WorkflowSharedState/06_WorkflowSharedState.csproj new file mode 100644 index 0000000000..c7efbb7d1b --- /dev/null +++ b/dotnet/samples/Durable/Workflow/ConsoleApps/06_WorkflowSharedState/06_WorkflowSharedState.csproj @@ -0,0 +1,29 @@ + + + net10.0 + Exe + enable + enable + WorkflowSharedState + WorkflowSharedState + + + + + + + + + + + + + + + + diff --git a/dotnet/samples/Durable/Workflow/ConsoleApps/06_WorkflowSharedState/Executors.cs b/dotnet/samples/Durable/Workflow/ConsoleApps/06_WorkflowSharedState/Executors.cs new file mode 100644 index 0000000000..05afbab71d --- /dev/null +++ b/dotnet/samples/Durable/Workflow/ConsoleApps/06_WorkflowSharedState/Executors.cs @@ -0,0 +1,182 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Microsoft.Agents.AI.Workflows; + +namespace WorkflowSharedState; + +// ═══════════════════════════════════════════════════════════════════════════════ +// Domain models +// ═══════════════════════════════════════════════════════════════════════════════ + +/// +/// The primary order data passed through the pipeline via return values. +/// +internal sealed record OrderDetails(string OrderId, string CustomerName, decimal Amount, DateTime OrderDate); + +/// +/// Cross-cutting audit trail accumulated in shared state across executors. +/// Each executor appends its step name and timestamp. This data does not flow +/// through return values — it lives only in shared state. +/// +internal sealed record AuditEntry(string Step, string Timestamp, string Detail); + +// ═══════════════════════════════════════════════════════════════════════════════ +// Executors +// ═══════════════════════════════════════════════════════════════════════════════ + +/// +/// Validates the order and writes the initial audit entry and tax rate to shared state. +/// The order details are returned as the executor output (normal message flow), +/// while the audit trail and tax rate are stored in shared state (side-channel). +/// If the order ID starts with "INVALID", the executor halts the workflow early +/// using . +/// +internal sealed class ValidateOrder() : Executor("ValidateOrder") +{ + public override async ValueTask HandleAsync( + string message, + IWorkflowContext context, + CancellationToken cancellationToken = default) + { + await Task.Delay(TimeSpan.FromMilliseconds(200), cancellationToken); + + // Halt the workflow early if the order ID is invalid. + // No downstream executors will run after this. + if (message.StartsWith("INVALID", StringComparison.OrdinalIgnoreCase)) + { + await context.YieldOutputAsync($"Order '{message}' failed validation. Halting workflow.", cancellationToken); + await context.RequestHaltAsync(); + return new OrderDetails(message, "Unknown", 0, DateTime.UtcNow); + } + + OrderDetails details = new(message, "Jerry", 249.99m, DateTime.UtcNow); + + // Store the tax rate in shared state — downstream ProcessPayment reads it + // without needing it in the message chain. + await context.QueueStateUpdateAsync("taxRate", 0.085m, cancellationToken: cancellationToken); + Console.WriteLine(" Wrote to shared state: taxRate = 8.5%"); + + // Start the audit trail in shared state + AuditEntry audit = new("ValidateOrder", DateTime.UtcNow.ToString("o"), $"Validated order {message}"); + await context.QueueStateUpdateAsync("auditValidate", audit, cancellationToken: cancellationToken); + Console.WriteLine(" Wrote to shared state: auditValidate"); + + await context.YieldOutputAsync($"Order '{message}' validated. Customer: {details.CustomerName}, Amount: {details.Amount:C}", cancellationToken); + + return details; + } +} + +/// +/// Enriches the order with shipping information. +/// Reads the audit trail from shared state and appends its own entry. +/// Uses ReadOrInitStateAsync to lazily initialize a shipping tier. +/// Demonstrates custom scopes by writing shipping details under the "shipping" scope. +/// +internal sealed class EnrichOrder() : Executor("EnrichOrder") +{ + public override async ValueTask HandleAsync( + OrderDetails message, + IWorkflowContext context, + CancellationToken cancellationToken = default) + { + await Task.Delay(TimeSpan.FromMilliseconds(200), cancellationToken); + + // Use ReadOrInitStateAsync — only initializes if no value exists yet + string shippingTier = await context.ReadOrInitStateAsync( + "shippingTier", + () => "Express", + cancellationToken: cancellationToken); + Console.WriteLine($" Read from shared state: shippingTier = {shippingTier}"); + + // Write carrier under a custom "shipping" scope. + // This keeps the key separate from keys written without a scope, + // so "carrier" here won't collide with a "carrier" key written elsewhere. + await context.QueueStateUpdateAsync("carrier", "Contoso Express", scopeName: "shipping", cancellationToken: cancellationToken); + Console.WriteLine(" Wrote to shared state: carrier = Contoso Express (scope: shipping)"); + + // Verify we can read the audit entry from the previous step + AuditEntry? previousAudit = await context.ReadStateAsync("auditValidate", cancellationToken: cancellationToken); + string auditStatus = previousAudit is not null ? $"(previous step: {previousAudit.Step})" : "(no prior audit)"; + Console.WriteLine($" Read from shared state: auditValidate {auditStatus}"); + + // Append our own audit entry + AuditEntry audit = new("EnrichOrder", DateTime.UtcNow.ToString("o"), $"Enriched with {shippingTier} shipping {auditStatus}"); + await context.QueueStateUpdateAsync("auditEnrich", audit, cancellationToken: cancellationToken); + Console.WriteLine(" Wrote to shared state: auditEnrich"); + + await context.YieldOutputAsync($"Order enriched. Shipping: {shippingTier} {auditStatus}", cancellationToken); + + return message; + } +} + +/// +/// Processes payment using the tax rate from shared state (written by ValidateOrder). +/// The tax rate is side-channel data — it doesn't flow through return values. +/// +internal sealed class ProcessPayment() : Executor("ProcessPayment") +{ + public override async ValueTask HandleAsync( + OrderDetails message, + IWorkflowContext context, + CancellationToken cancellationToken = default) + { + await Task.Delay(TimeSpan.FromMilliseconds(300), cancellationToken); + + // Read tax rate written by ValidateOrder — not available in the message chain + decimal taxRate = await context.ReadOrInitStateAsync("taxRate", () => 0.0m, cancellationToken: cancellationToken); + Console.WriteLine($" Read from shared state: taxRate = {taxRate:P1}"); + + decimal tax = message.Amount * taxRate; + decimal total = message.Amount + tax; + string paymentRef = $"PAY-{Guid.NewGuid():N}"[..16]; + + // Append audit entry + AuditEntry audit = new("ProcessPayment", DateTime.UtcNow.ToString("o"), $"Charged {total:C} (tax: {tax:C})"); + await context.QueueStateUpdateAsync("auditPayment", audit, cancellationToken: cancellationToken); + Console.WriteLine(" Wrote to shared state: auditPayment"); + + await context.YieldOutputAsync($"Payment processed. Total: {total:C} (tax: {tax:C}). Ref: {paymentRef}", cancellationToken); + + return paymentRef; + } +} + +/// +/// Generates the final invoice by reading the full audit trail from shared state. +/// Demonstrates reading multiple state entries written by different executors +/// and clearing a scope with . +/// +internal sealed class GenerateInvoice() : Executor("GenerateInvoice") +{ + public override async ValueTask HandleAsync( + string message, + IWorkflowContext context, + CancellationToken cancellationToken = default) + { + await Task.Delay(TimeSpan.FromMilliseconds(100), cancellationToken); + + // Read the full audit trail from shared state — each step wrote its own entry + AuditEntry? validateAudit = await context.ReadStateAsync("auditValidate", cancellationToken: cancellationToken); + AuditEntry? enrichAudit = await context.ReadStateAsync("auditEnrich", cancellationToken: cancellationToken); + AuditEntry? paymentAudit = await context.ReadStateAsync("auditPayment", cancellationToken: cancellationToken); + int auditCount = new[] { validateAudit, enrichAudit, paymentAudit }.Count(a => a is not null); + Console.WriteLine($" Read from shared state: {auditCount} audit entries"); + + // Read carrier from the "shipping" scope (written by EnrichOrder) + string? carrier = await context.ReadStateAsync("carrier", scopeName: "shipping", cancellationToken: cancellationToken); + Console.WriteLine($" Read from shared state: carrier = {carrier} (scope: shipping)"); + + // Clear the "shipping" scope — no longer needed after invoice generation. + await context.QueueClearScopeAsync("shipping", cancellationToken); + Console.WriteLine(" Cleared shared state scope: shipping"); + + string auditSummary = string.Join(" → ", new[] + { + validateAudit?.Step, enrichAudit?.Step, paymentAudit?.Step + }.Where(s => s is not null)); + + return $"Invoice complete. Payment: {message}. Audit trail: [{auditSummary}]"; + } +} diff --git a/dotnet/samples/Durable/Workflow/ConsoleApps/06_WorkflowSharedState/Program.cs b/dotnet/samples/Durable/Workflow/ConsoleApps/06_WorkflowSharedState/Program.cs new file mode 100644 index 0000000000..4b46779eb8 --- /dev/null +++ b/dotnet/samples/Durable/Workflow/ConsoleApps/06_WorkflowSharedState/Program.cs @@ -0,0 +1,117 @@ +// Copyright (c) Microsoft. All rights reserved. + +// ═══════════════════════════════════════════════════════════════════════════════ +// SAMPLE: Shared State During Workflow Execution +// ═══════════════════════════════════════════════════════════════════════════════ +// +// This sample demonstrates how executors in a durable workflow can share state +// via IWorkflowContext. State is persisted across supersteps and survives +// process restarts because the orchestration passes it to each activity. +// +// Key concepts: +// 1. QueueStateUpdateAsync - Write a value to shared state +// 2. ReadStateAsync - Read a value written by a previous executor +// 3. ReadOrInitStateAsync - Read or lazily initialize a state value +// 4. QueueClearScopeAsync - Clear all entries under a scope +// 5. RequestHaltAsync - Stop the workflow early (e.g., validation failure) +// +// Workflow: ValidateOrder -> EnrichOrder -> ProcessPayment -> GenerateInvoice +// +// Return values carry primary business data through the pipeline (OrderDetails, +// payment ref). Shared state carries side-channel data that doesn't belong in +// the message chain: a tax rate (set by ValidateOrder, read by ProcessPayment) +// and an audit trail (each executor appends its own entry). +// ═══════════════════════════════════════════════════════════════════════════════ + +using Microsoft.Agents.AI.DurableTask; +using Microsoft.Agents.AI.DurableTask.Workflows; +using Microsoft.Agents.AI.Workflows; +using Microsoft.DurableTask.Client.AzureManaged; +using Microsoft.DurableTask.Worker.AzureManaged; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using WorkflowSharedState; + +// Get DTS connection string from environment variable +string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING") + ?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"; + +// Define executors +ValidateOrder validateOrder = new(); +EnrichOrder enrichOrder = new(); +ProcessPayment processPayment = new(); +GenerateInvoice generateInvoice = new(); + +// Build the workflow: ValidateOrder -> EnrichOrder -> ProcessPayment -> GenerateInvoice +Workflow orderPipeline = new WorkflowBuilder(validateOrder) + .WithName("OrderPipeline") + .WithDescription("Order processing pipeline with shared state across executors") + .AddEdge(validateOrder, enrichOrder) + .AddEdge(enrichOrder, processPayment) + .AddEdge(processPayment, generateInvoice) + .Build(); + +// Configure host with durable workflow support +IHost host = Host.CreateDefaultBuilder(args) + .ConfigureLogging(logging => logging.SetMinimumLevel(LogLevel.Warning)) + .ConfigureServices(services => + { + services.ConfigureDurableWorkflows( + workflowOptions => workflowOptions.AddWorkflow(orderPipeline), + workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString), + clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString)); + }) + .Build(); + +await host.StartAsync(); + +IWorkflowClient workflowClient = host.Services.GetRequiredService(); + +Console.WriteLine("Shared State Workflow Demo"); +Console.WriteLine("Workflow: ValidateOrder -> EnrichOrder -> ProcessPayment -> GenerateInvoice"); +Console.WriteLine(); +Console.WriteLine("Enter an order ID (or 'exit'):"); + +while (true) +{ + Console.Write("> "); + string? input = Console.ReadLine(); + if (string.IsNullOrWhiteSpace(input) || input.Equals("exit", StringComparison.OrdinalIgnoreCase)) + { + break; + } + + try + { + // Start the workflow and stream events to see shared state in action + IStreamingWorkflowRun run = await workflowClient.StreamAsync(orderPipeline, input); + Console.WriteLine($"Started run: {run.RunId}"); + + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + switch (evt) + { + case WorkflowOutputEvent e: + Console.WriteLine($" [Output] {e.SourceId}: {e.Data}"); + break; + + case DurableWorkflowCompletedEvent e: + Console.WriteLine($" Completed: {e.Result}"); + break; + + case DurableWorkflowFailedEvent e: + Console.WriteLine($" Failed: {e.ErrorMessage}"); + break; + } + } + } + catch (Exception ex) + { + Console.WriteLine($"Error: {ex.Message}"); + } + + Console.WriteLine(); +} + +await host.StopAsync(); diff --git a/dotnet/samples/Durable/Workflow/ConsoleApps/06_WorkflowSharedState/README.md b/dotnet/samples/Durable/Workflow/ConsoleApps/06_WorkflowSharedState/README.md new file mode 100644 index 0000000000..31ff55ce84 --- /dev/null +++ b/dotnet/samples/Durable/Workflow/ConsoleApps/06_WorkflowSharedState/README.md @@ -0,0 +1,71 @@ +# Shared State Workflow Sample + +This sample demonstrates how executors in a durable workflow can share state via `IWorkflowContext`. State written by one executor is accessible to all downstream executors, persisted across supersteps, and survives process restarts. + +## Key Concepts Demonstrated + +- Writing state with `QueueStateUpdateAsync` — executors store data for downstream executors +- Reading state with `ReadStateAsync` — executors access data written by earlier executors +- Lazy initialization with `ReadOrInitStateAsync` — initialize state only if not already present +- Custom scopes with `scopeName` — partition state into isolated namespaces (e.g., `"shipping"`) +- Clearing scopes with `QueueClearScopeAsync` — remove all entries under a scope when no longer needed +- Early termination with `RequestHaltAsync` — halt the workflow when validation fails +- State persistence across supersteps — the orchestration passes shared state to each executor +- Event streaming with `IStreamingWorkflowRun` — observe executor progress in real time + +## Workflow + +**OrderPipeline**: `ValidateOrder` → `EnrichOrder` → `ProcessPayment` → `GenerateInvoice` + +Return values carry primary business data through the pipeline (`OrderDetails` → `OrderDetails` → payment ref → invoice string). Shared state carries side-channel data that doesn't belong in the message chain: + +| Executor | Returns (message flow) | Reads from State | Writes to State | +|----------|----------------------|-----------------|-----------------| +| **ValidateOrder** | `OrderDetails` | — | `taxRate`, `auditValidate` | +| **EnrichOrder** | `OrderDetails` (pass-through) | `auditValidate` | `shippingTier`, `auditEnrich`, `carrier` (scope: shipping) | +| **ProcessPayment** | payment ref string | `taxRate` | `auditPayment` | +| **GenerateInvoice** | invoice string | `auditValidate`, `auditEnrich`, `auditPayment`, `carrier` (scope: shipping) | clears `shipping` scope | + +> [!NOTE] +> `EnrichOrder` writes `carrier` under the `"shipping"` scope using `scopeName: "shipping"`. This keeps the key separate from keys written without a scope, so `"carrier"` in the `"shipping"` scope won't collide with a `"carrier"` key written elsewhere. + +## Environment Setup + +See the [README.md](../../README.md) file in the parent directory for more information on how to configure the environment, including how to install and run common sample dependencies. + +## Running the Sample + +```bash +dotnet run +``` + +Enter an order ID when prompted. The workflow will process the order through all four executors, streaming events as they occur: + +```text +> ORD-001 +Started run: abc123 + Wrote to shared state: taxRate = 8.5% + Wrote to shared state: auditValidate + [Output] ValidateOrder: Order 'ORD-001' validated. Customer: Jerry, Amount: $249.99 + Read from shared state: shippingTier = Express + Wrote to shared state: carrier = Contoso Express (scope: shipping) + Read from shared state: auditValidate (previous step: ValidateOrder) + Wrote to shared state: auditEnrich + [Output] EnrichOrder: Order enriched. Shipping: Express (previous step: ValidateOrder) + Read from shared state: taxRate = 8.5% + Wrote to shared state: auditPayment + [Output] ProcessPayment: Payment processed. Total: $271.24 (tax: $21.25). Ref: PAY-abc123def456 + Read from shared state: 3 audit entries + Read from shared state: carrier = Contoso Express (scope: shipping) + Cleared shared state scope: shipping + [Output] GenerateInvoice: Invoice complete. Payment: "PAY-abc123def456". Audit trail: [ValidateOrder → EnrichOrder → ProcessPayment] + Completed: Invoice complete. Payment: "PAY-abc123def456". Audit trail: [ValidateOrder → EnrichOrder → ProcessPayment] +``` + +### Viewing Workflows in the DTS Dashboard + +After running a workflow, you can navigate to the Durable Task Scheduler (DTS) dashboard to inspect the orchestration status, executor inputs/outputs, and events. + +If you are using the DTS emulator, the dashboard is available at `http://localhost:8082`. + +To inspect shared state in the dashboard, click on an executor to view its input and output. The input contains a snapshot of the shared state the executor ran with, and the output includes any state updates it made (as `stateUpdates` with scoped keys). diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/ServiceCollectionExtensions.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/ServiceCollectionExtensions.cs index 22cfc06518..29e56ea398 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/ServiceCollectionExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/ServiceCollectionExtensions.cs @@ -312,11 +312,8 @@ private static WorkflowRegistrationInfo BuildWorkflowRegistration( Dictionary executorBindings = workflow.ReflectExecutors(); List activities = []; - // Filter out AI agents and subworkflows - they are not registered as activities. - // AI agents use Durable Entities for stateful execution, and subworkflows are - // registered as separate orchestrations via BuildWorkflowRegistrationRecursive. foreach (KeyValuePair entry in executorBindings - .Where(e => e.Value is not AIAgentBinding and not SubworkflowBinding)) + .Where(e => IsActivityBinding(e.Value))) { string executorName = WorkflowNamingHelper.GetExecutorName(entry.Key); string activityName = WorkflowNamingHelper.ToOrchestrationFunctionName(executorName); @@ -330,6 +327,15 @@ private static WorkflowRegistrationInfo BuildWorkflowRegistration( return new WorkflowRegistrationInfo(orchestrationName, activities); } + /// + /// Returns for bindings that should be registered as Durable Task activities. + /// (Durable Entities) and (sub-orchestrations) + /// use specialized dispatch and are excluded. + /// + private static bool IsActivityBinding(ExecutorBinding binding) + => binding is not AIAgentBinding + and not SubworkflowBinding; + private static async Task RunWorkflowOrchestrationAsync( TaskOrchestrationContext context, DurableWorkflowInput workflowInput, diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableActivityContext.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableActivityContext.cs deleted file mode 100644 index 3e21f7a75a..0000000000 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableActivityContext.cs +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. - -using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; -using System.Text.Json; -using Microsoft.Agents.AI.Workflows; - -namespace Microsoft.Agents.AI.DurableTask.Workflows; - -/// -/// A workflow context for durable activity execution. -/// -/// -/// Some of the methods are returning default for this version. Those method will be updated with real implementations in follow up PRs. -/// -[DebuggerDisplay("Executor = {_executor.Id}, StateEntries = {_initialState.Count}")] -internal sealed class DurableActivityContext : IWorkflowContext -{ - private readonly Dictionary _initialState; - private readonly Executor _executor; - - /// - /// Initializes a new instance of the class. - /// - /// The shared state passed from the orchestration. - /// The executor running in this context. - internal DurableActivityContext(Dictionary? initialState, Executor executor) - { - this._executor = executor; - this._initialState = initialState ?? []; - } - - /// - /// Gets the messages sent during activity execution via . - /// - internal List SentMessages { get; } = []; - - /// - public ValueTask AddEventAsync( - WorkflowEvent workflowEvent, - CancellationToken cancellationToken = default) => default; - - /// - [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Serializing workflow message types registered at startup.")] - [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Serializing workflow message types registered at startup.")] - public ValueTask SendMessageAsync( - object message, - string? targetId = null, - CancellationToken cancellationToken = default) - { - if (message is not null) - { - Type messageType = message.GetType(); - this.SentMessages.Add(new SentMessageInfo - { - Message = JsonSerializer.Serialize(message, messageType), - TypeName = messageType.FullName ?? messageType.Name - }); - } - - return default; - } - - /// - public ValueTask YieldOutputAsync( - object output, - CancellationToken cancellationToken = default) => default; - - /// - public ValueTask RequestHaltAsync() => default; - - /// - public ValueTask ReadStateAsync( - string key, - string? scopeName = null, - CancellationToken cancellationToken = default) => default; - - /// - public ValueTask ReadOrInitStateAsync( - string key, - Func initialStateFactory, - string? scopeName = null, - CancellationToken cancellationToken = default) => default; - - /// - public ValueTask> ReadStateKeysAsync( - string? scopeName = null, - CancellationToken cancellationToken = default) => default; - - /// - public ValueTask QueueStateUpdateAsync( - string key, - T? value, - string? scopeName = null, - CancellationToken cancellationToken = default) => default; - - /// - public ValueTask QueueClearScopeAsync( - string? scopeName = null, - CancellationToken cancellationToken = default) => default; - - /// - public IReadOnlyDictionary? TraceContext => null; - - /// - public bool ConcurrentRunsEnabled => false; -} diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableActivityExecutor.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableActivityExecutor.cs index 69d6e6c8cc..526a0f00d4 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableActivityExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableActivityExecutor.cs @@ -15,15 +15,6 @@ namespace Microsoft.Agents.AI.DurableTask.Workflows; [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Workflow and executor types are registered at startup.")] internal static class DurableActivityExecutor { - /// - /// Shared JSON options that match the DurableDataConverter settings. - /// - private static readonly JsonSerializerOptions s_jsonOptions = new() - { - PropertyNamingPolicy = JsonNamingPolicy.CamelCase, - PropertyNameCaseInsensitive = true - }; - /// /// Executes an activity using the provided executor binding. /// @@ -53,7 +44,7 @@ internal static async Task ExecuteAsync( Type inputType = ResolveInputType(inputWithState?.InputTypeName, executor.InputTypes); object typedInput = DeserializeInput(executorInput, inputType); - DurableActivityContext workflowContext = new(sharedState, executor); + DurableWorkflowContext workflowContext = new(sharedState, executor); object? result = await executor.ExecuteAsync( typedInput, new TypeId(inputType), @@ -63,19 +54,34 @@ internal static async Task ExecuteAsync( return SerializeActivityOutput(result, workflowContext); } - private static string SerializeActivityOutput(object? result, DurableActivityContext context) + private static string SerializeActivityOutput(object? result, DurableWorkflowContext context) { - DurableActivityOutput output = new() + DurableExecutorOutput output = new() { Result = SerializeResult(result), - SentMessages = context.SentMessages.ConvertAll(m => new SentMessageInfo - { - Message = m.Message, - TypeName = m.TypeName - }) + StateUpdates = context.StateUpdates, + ClearedScopes = [.. context.ClearedScopes], + Events = context.OutboundEvents.ConvertAll(SerializeEvent), + SentMessages = context.SentMessages, + HaltRequested = context.HaltRequested + }; + + return JsonSerializer.Serialize(output, DurableWorkflowJsonContext.Default.DurableExecutorOutput); + } + + /// + /// Serializes a workflow event with type information for proper deserialization. + /// + private static string SerializeEvent(WorkflowEvent evt) + { + Type eventType = evt.GetType(); + TypedPayload wrapper = new() + { + TypeName = eventType.AssemblyQualifiedName, + Data = JsonSerializer.Serialize(evt, eventType, DurableSerialization.Options) }; - return JsonSerializer.Serialize(output, DurableWorkflowJsonContext.Default.DurableActivityOutput); + return JsonSerializer.Serialize(wrapper, DurableWorkflowJsonContext.Default.TypedPayload); } private static string SerializeResult(object? result) @@ -90,7 +96,7 @@ private static string SerializeResult(object? result) return str; } - return JsonSerializer.Serialize(result, result.GetType(), s_jsonOptions); + return JsonSerializer.Serialize(result, result.GetType(), DurableSerialization.Options); } private static DurableActivityInput? TryDeserializeActivityInput(string input) @@ -112,7 +118,7 @@ private static object DeserializeInput(string input, Type targetType) return input; } - return JsonSerializer.Deserialize(input, targetType, s_jsonOptions) + return JsonSerializer.Deserialize(input, targetType, DurableSerialization.Options) ?? throw new InvalidOperationException($"Failed to deserialize input to type '{targetType.Name}'."); } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableActivityOutput.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableActivityOutput.cs deleted file mode 100644 index 537e21c9ef..0000000000 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableActivityOutput.cs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. - -namespace Microsoft.Agents.AI.DurableTask.Workflows; - -/// -/// Output payload from activity execution, containing the result and other metadata. -/// -internal sealed class DurableActivityOutput -{ - /// - /// Gets or sets the serialized result of the activity. - /// - public string? Result { get; set; } - - /// - /// Gets or sets the collection of messages that have been sent. - /// - public List SentMessages { get; set; } = []; -} diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableExecutorDispatcher.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableExecutorDispatcher.cs index 903b2fb127..a0257c6d91 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableExecutorDispatcher.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableExecutorDispatcher.cs @@ -31,12 +31,14 @@ internal static class DurableExecutorDispatcher /// The task orchestration context. /// Information about the executor to dispatch. /// The message envelope containing input and type information. + /// The shared state dictionary to pass to the executor. /// The logger for tracing. /// The result from the executor. internal static async Task DispatchAsync( TaskOrchestrationContext context, WorkflowExecutorInfo executorInfo, DurableMessageEnvelope envelope, + Dictionary sharedState, ILogger logger) { logger.LogDispatchingExecutor(executorInfo.ExecutorId, executorInfo.IsAgenticExecutor); @@ -46,14 +48,15 @@ internal static async Task DispatchAsync( return await ExecuteAgentAsync(context, executorInfo, logger, envelope.Message).ConfigureAwait(true); } - return await ExecuteActivityAsync(context, executorInfo, envelope.Message, envelope.InputTypeName).ConfigureAwait(true); + return await ExecuteActivityAsync(context, executorInfo, envelope.Message, envelope.InputTypeName, sharedState).ConfigureAwait(true); } private static async Task ExecuteActivityAsync( TaskOrchestrationContext context, WorkflowExecutorInfo executorInfo, string input, - string? inputTypeName) + string? inputTypeName, + Dictionary sharedState) { string executorName = WorkflowNamingHelper.GetExecutorName(executorInfo.ExecutorId); string activityName = WorkflowNamingHelper.ToOrchestrationFunctionName(executorName); @@ -61,7 +64,8 @@ private static async Task ExecuteActivityAsync( DurableActivityInput activityInput = new() { Input = input, - InputTypeName = inputTypeName + InputTypeName = inputTypeName, + State = sharedState }; string serializedInput = JsonSerializer.Serialize(activityInput, DurableWorkflowJsonContext.Default.DurableActivityInput); diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableExecutorOutput.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableExecutorOutput.cs new file mode 100644 index 0000000000..ce3f26c14b --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableExecutorOutput.cs @@ -0,0 +1,39 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.DurableTask.Workflows; + +/// +/// Output payload from executor execution, containing the result, state updates, and emitted events. +/// +internal sealed class DurableExecutorOutput +{ + /// + /// Gets the executor result. + /// + public string? Result { get; init; } + + /// + /// Gets the state updates (scope-prefixed key to value; null indicates deletion). + /// + public Dictionary StateUpdates { get; init; } = []; + + /// + /// Gets the scope names that were cleared. + /// + public List ClearedScopes { get; init; } = []; + + /// + /// Gets the workflow events emitted during execution. + /// + public List Events { get; init; } = []; + + /// + /// Gets the typed messages sent to downstream executors. + /// + public List SentMessages { get; init; } = []; + + /// + /// Gets a value indicating whether the executor requested a workflow halt. + /// + public bool HaltRequested { get; init; } +} diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableHaltRequestedEvent.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableHaltRequestedEvent.cs new file mode 100644 index 0000000000..6c7aacfc48 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableHaltRequestedEvent.cs @@ -0,0 +1,25 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Microsoft.Agents.AI.Workflows; + +namespace Microsoft.Agents.AI.DurableTask.Workflows; + +/// +/// Event raised when an executor requests the workflow to halt via . +/// +public sealed class DurableHaltRequestedEvent : WorkflowEvent +{ + /// + /// Initializes a new instance of the class. + /// + /// The ID of the executor that requested the halt. + public DurableHaltRequestedEvent(string executorId) : base($"Halt requested by {executorId}") + { + this.ExecutorId = executorId; + } + + /// + /// Gets the ID of the executor that requested the halt. + /// + public string ExecutorId { get; } +} diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableRunStatus.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableRunStatus.cs new file mode 100644 index 0000000000..4ed2049dc9 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableRunStatus.cs @@ -0,0 +1,49 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.DurableTask.Workflows; + +/// +/// Represents the execution status of a durable workflow run. +/// +public enum DurableRunStatus +{ + /// + /// The workflow instance was not found. + /// + NotFound, + + /// + /// The workflow is pending and has not started. + /// + Pending, + + /// + /// The workflow is currently running. + /// + Running, + + /// + /// The workflow completed successfully. + /// + Completed, + + /// + /// The workflow failed with an error. + /// + Failed, + + /// + /// The workflow was terminated. + /// + Terminated, + + /// + /// The workflow is suspended. + /// + Suspended, + + /// + /// The workflow status is unknown. + /// + Unknown +} diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableSerialization.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableSerialization.cs new file mode 100644 index 0000000000..245ec36fb8 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableSerialization.cs @@ -0,0 +1,22 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Text.Json; + +namespace Microsoft.Agents.AI.DurableTask.Workflows; + +/// +/// Shared serialization options for user-defined workflow types that are not known at compile time +/// and therefore cannot use the source-generated . +/// +internal static class DurableSerialization +{ + /// + /// Gets the shared for workflow serialization + /// with camelCase naming and case-insensitive deserialization. + /// + internal static JsonSerializerOptions Options { get; } = new() + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + PropertyNameCaseInsensitive = true + }; +} diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableStreamingWorkflowRun.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableStreamingWorkflowRun.cs new file mode 100644 index 0000000000..57a44fc06b --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableStreamingWorkflowRun.cs @@ -0,0 +1,408 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; +using System.Text.Json; +using Microsoft.Agents.AI.Workflows; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; + +namespace Microsoft.Agents.AI.DurableTask.Workflows; + +/// +/// Represents a durable workflow run that supports streaming workflow events as they occur. +/// +/// +/// Events are detected by monitoring the orchestration's custom status at regular intervals. +/// When executors emit events via or +/// , they are written to the orchestration's +/// custom status and picked up by this streaming run. +/// +[DebuggerDisplay("{WorkflowName} ({RunId})")] +internal sealed class DurableStreamingWorkflowRun : IStreamingWorkflowRun +{ + private readonly DurableTaskClient _client; + + /// + /// Initializes a new instance of the class. + /// + /// The durable task client for orchestration operations. + /// The unique instance ID for this orchestration run. + /// The workflow being executed. + internal DurableStreamingWorkflowRun(DurableTaskClient client, string instanceId, Workflow workflow) + { + this._client = client; + this.RunId = instanceId; + this.WorkflowName = workflow.Name ?? string.Empty; + } + + /// + public string RunId { get; } + + /// + /// Gets the name of the workflow being executed. + /// + public string WorkflowName { get; } + + /// + /// Gets the current execution status of the workflow run. + /// + /// A cancellation token to observe. + /// The current status of the durable run. + public async ValueTask GetStatusAsync(CancellationToken cancellationToken = default) + { + OrchestrationMetadata? metadata = await this._client.GetInstanceAsync( + this.RunId, + getInputsAndOutputs: false, + cancellation: cancellationToken).ConfigureAwait(false); + + if (metadata is null) + { + return DurableRunStatus.NotFound; + } + + return metadata.RuntimeStatus switch + { + OrchestrationRuntimeStatus.Pending => DurableRunStatus.Pending, + OrchestrationRuntimeStatus.Running => DurableRunStatus.Running, + OrchestrationRuntimeStatus.Completed => DurableRunStatus.Completed, + OrchestrationRuntimeStatus.Failed => DurableRunStatus.Failed, + OrchestrationRuntimeStatus.Terminated => DurableRunStatus.Terminated, + OrchestrationRuntimeStatus.Suspended => DurableRunStatus.Suspended, + _ => DurableRunStatus.Unknown + }; + } + + /// + public IAsyncEnumerable WatchStreamAsync(CancellationToken cancellationToken = default) + => this.WatchStreamAsync(pollingInterval: null, cancellationToken); + + /// + /// Asynchronously streams workflow events as they occur during workflow execution. + /// + /// The interval between status checks. Defaults to 100ms. + /// A cancellation token to observe. + /// An asynchronous stream of objects. + private async IAsyncEnumerable WatchStreamAsync( + TimeSpan? pollingInterval, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + TimeSpan minInterval = pollingInterval ?? TimeSpan.FromMilliseconds(100); + TimeSpan maxInterval = TimeSpan.FromSeconds(2); + TimeSpan currentInterval = minInterval; + + // Track how many events we've already read from custom status + int lastReadEventIndex = 0; + + while (!cancellationToken.IsCancellationRequested) + { + // Poll with getInputsAndOutputs: true because SerializedCustomStatus + // (used for event streaming) is only populated when this flag is set. + OrchestrationMetadata? metadata = await this._client.GetInstanceAsync( + this.RunId, + getInputsAndOutputs: true, + cancellation: cancellationToken).ConfigureAwait(false); + + if (metadata is null) + { + yield break; + } + + bool hasNewEvents = false; + + // Always drain any unread events from custom status before checking terminal states. + // The orchestration may complete before the next poll, so events would be lost if we + // check terminal status first. + if (metadata.SerializedCustomStatus is not null) + { + if (TryParseCustomStatus(metadata.SerializedCustomStatus, out DurableWorkflowCustomStatus customStatus)) + { + (List events, lastReadEventIndex) = DrainNewEvents(customStatus.Events, lastReadEventIndex); + foreach (WorkflowEvent evt in events) + { + hasNewEvents = true; + yield return evt; + } + } + } + + // Check terminal states after draining events from custom status + if (metadata.RuntimeStatus == OrchestrationRuntimeStatus.Completed) + { + // The framework clears custom status on completion, so events may be in + // SerializedOutput as a DurableWorkflowResult wrapper. + if (TryParseWorkflowResult(metadata.SerializedOutput, out DurableWorkflowResult? outputResult)) + { + (List events, _) = DrainNewEvents(outputResult.Events, lastReadEventIndex); + foreach (WorkflowEvent evt in events) + { + yield return evt; + } + + yield return new DurableWorkflowCompletedEvent(outputResult.Result); + } + else + { + // The runner always wraps output in DurableWorkflowResult, so a parse + // failure here indicates a bug. Yield a failed event so the consumer + // gets a visible, handleable signal without crashing. + yield return new DurableWorkflowFailedEvent( + $"Workflow '{this.WorkflowName}' (RunId: {this.RunId}) completed but its output could not be parsed as DurableWorkflowResult."); + } + + yield break; + } + + if (metadata.RuntimeStatus == OrchestrationRuntimeStatus.Failed) + { + string errorMessage = metadata.FailureDetails?.ErrorMessage ?? "Workflow execution failed."; + yield return new DurableWorkflowFailedEvent(errorMessage, metadata.FailureDetails); + yield break; + } + + if (metadata.RuntimeStatus == OrchestrationRuntimeStatus.Terminated) + { + yield return new DurableWorkflowFailedEvent("Workflow was terminated."); + yield break; + } + + // Adaptive backoff: reset to minimum when events were found, increase otherwise + currentInterval = hasNewEvents + ? minInterval + : TimeSpan.FromMilliseconds(Math.Min(currentInterval.TotalMilliseconds * 2, maxInterval.TotalMilliseconds)); + + try + { + await Task.Delay(currentInterval, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + yield break; + } + } + } + + /// + /// Waits for the workflow to complete and returns the result. + /// + /// The expected result type. + /// A cancellation token to observe. + /// The result of the workflow execution. + /// Thrown when the workflow failed. + /// Thrown when the workflow was terminated or ended with an unexpected status. + public async ValueTask WaitForCompletionAsync(CancellationToken cancellationToken = default) + { + OrchestrationMetadata metadata = await this._client.WaitForInstanceCompletionAsync( + this.RunId, + getInputsAndOutputs: true, + cancellation: cancellationToken).ConfigureAwait(false); + + if (metadata.RuntimeStatus == OrchestrationRuntimeStatus.Completed) + { + return ExtractResult(metadata.SerializedOutput); + } + + if (metadata.RuntimeStatus == OrchestrationRuntimeStatus.Failed) + { + if (metadata.FailureDetails is not null) + { + throw new TaskFailedException( + taskName: this.WorkflowName, + taskId: -1, + failureDetails: metadata.FailureDetails); + } + + throw new InvalidOperationException( + $"Workflow '{this.WorkflowName}' (RunId: {this.RunId}) failed without failure details."); + } + + throw new InvalidOperationException( + $"Workflow '{this.WorkflowName}' (RunId: {this.RunId}) ended with unexpected status: {metadata.RuntimeStatus}"); + } + + /// + /// Deserializes and returns any events beyond from the list. + /// + private static (List Events, int UpdatedIndex) DrainNewEvents(List serializedEvents, int lastReadIndex) + { + List events = []; + while (lastReadIndex < serializedEvents.Count) + { + string serializedEvent = serializedEvents[lastReadIndex]; + lastReadIndex++; + + WorkflowEvent? workflowEvent = TryDeserializeEvent(serializedEvent); + if (workflowEvent is not null) + { + events.Add(workflowEvent); + } + } + + return (events, lastReadIndex); + } + + [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow custom status.")] + [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow custom status.")] + private static bool TryParseCustomStatus(string serializedStatus, out DurableWorkflowCustomStatus result) + { + try + { + result = JsonSerializer.Deserialize(serializedStatus, DurableWorkflowJsonContext.Default.DurableWorkflowCustomStatus)!; + return result is not null; + } + catch (JsonException) + { + result = default!; + return false; + } + } + + /// + /// Attempts to parse the orchestration output as a wrapper. + /// + /// + /// The orchestration wraps its output in a to include + /// accumulated events alongside the result. The Durable Task framework's DataConverter + /// serializes the string output with an extra layer of JSON encoding, so we first unwrap that. + /// + [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow result wrapper.")] + [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow result wrapper.")] + private static bool TryParseWorkflowResult(string? serializedOutput, [NotNullWhen(true)] out DurableWorkflowResult? result) + { + if (serializedOutput is null) + { + result = default!; + return false; + } + + try + { + // The DurableDataConverter wraps string results in JSON quotes, so + // SerializedOutput is a JSON-encoded string like "\"{ ... }\"". + // We need to unwrap the outer JSON string first. + string? innerJson = JsonSerializer.Deserialize(serializedOutput); + if (innerJson is null) + { + result = default!; + return false; + } + + result = JsonSerializer.Deserialize(innerJson, DurableWorkflowJsonContext.Default.DurableWorkflowResult)!; + return result is not null; + } + catch (JsonException) + { + result = default!; + return false; + } + } + + /// + /// Extracts a typed result from the orchestration output by unwrapping the + /// wrapper. + /// + [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow result.")] + [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow result.")] + internal static TResult? ExtractResult(string? serializedOutput) + { + if (serializedOutput is null) + { + return default; + } + + if (!TryParseWorkflowResult(serializedOutput, out DurableWorkflowResult? workflowResult)) + { + throw new InvalidOperationException( + "Failed to parse orchestration output as DurableWorkflowResult. " + + "The orchestration runner should always wrap output in this format."); + } + + string? resultJson = workflowResult.Result; + + if (resultJson is null) + { + return default; + } + + if (typeof(TResult) == typeof(string)) + { + return (TResult)(object)resultJson; + } + + return JsonSerializer.Deserialize(resultJson, DurableSerialization.Options); + } + + [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow event types.")] + [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow event types.")] + [UnconditionalSuppressMessage("Trimming", "IL2057", Justification = "Event types are registered at startup.")] + private static WorkflowEvent? TryDeserializeEvent(string serializedEvent) + { + try + { + TypedPayload? wrapper = JsonSerializer.Deserialize( + serializedEvent, + DurableWorkflowJsonContext.Default.TypedPayload); + + if (wrapper?.TypeName is not null && wrapper.Data is not null) + { + Type? eventType = Type.GetType(wrapper.TypeName); + if (eventType is not null) + { + return DeserializeEventByType(eventType, wrapper.Data); + } + } + + return null; + } + catch (JsonException) + { + return null; + } + } + + [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow event types.")] + [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow event types.")] + private static WorkflowEvent? DeserializeEventByType(Type eventType, string json) + { + // Types with internal constructors need manual deserialization + if (eventType == typeof(ExecutorInvokedEvent) + || eventType == typeof(ExecutorCompletedEvent) + || eventType == typeof(WorkflowOutputEvent)) + { + using JsonDocument doc = JsonDocument.Parse(json); + JsonElement root = doc.RootElement; + + if (eventType == typeof(ExecutorInvokedEvent)) + { + string executorId = root.GetProperty("executorId").GetString() ?? string.Empty; + JsonElement? data = GetDataProperty(root); + return new ExecutorInvokedEvent(executorId, data!); + } + + if (eventType == typeof(ExecutorCompletedEvent)) + { + string executorId = root.GetProperty("executorId").GetString() ?? string.Empty; + JsonElement? data = GetDataProperty(root); + return new ExecutorCompletedEvent(executorId, data); + } + + // WorkflowOutputEvent + string sourceId = root.GetProperty("sourceId").GetString() ?? string.Empty; + object? outputData = GetDataProperty(root); + return new WorkflowOutputEvent(outputData!, sourceId); + } + + return JsonSerializer.Deserialize(json, eventType, DurableSerialization.Options) as WorkflowEvent; + } + + private static JsonElement? GetDataProperty(JsonElement root) + { + if (!root.TryGetProperty("data", out JsonElement dataElement)) + { + return null; + } + + return dataElement.ValueKind == JsonValueKind.Null ? null : dataElement.Clone(); + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowClient.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowClient.cs index 6b01a39bdf..5944d578ef 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowClient.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowClient.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using Microsoft.Agents.AI.Workflows; using Microsoft.DurableTask; @@ -58,4 +58,38 @@ public ValueTask RunAsync( string? runId = null, CancellationToken cancellationToken = default) => this.RunAsync(workflow, input, runId, cancellationToken); + + /// + public async ValueTask StreamAsync( + Workflow workflow, + TInput input, + string? runId = null, + CancellationToken cancellationToken = default) + where TInput : notnull + { + ArgumentNullException.ThrowIfNull(workflow); + + if (string.IsNullOrEmpty(workflow.Name)) + { + throw new ArgumentException("Workflow must have a valid Name property.", nameof(workflow)); + } + + DurableWorkflowInput workflowInput = new() { Input = input }; + + string instanceId = await this._client.ScheduleNewOrchestrationInstanceAsync( + orchestratorName: WorkflowNamingHelper.ToOrchestrationFunctionName(workflow.Name), + input: workflowInput, + options: runId is not null ? new StartOrchestrationOptions(runId) : null, + cancellation: cancellationToken).ConfigureAwait(false); + + return new DurableStreamingWorkflowRun(this._client, instanceId, workflow); + } + + /// + public ValueTask StreamAsync( + Workflow workflow, + string input, + string? runId = null, + CancellationToken cancellationToken = default) + => this.StreamAsync(workflow, input, runId, cancellationToken); } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowCompletedEvent.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowCompletedEvent.cs new file mode 100644 index 0000000000..a4de6d1d50 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowCompletedEvent.cs @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Diagnostics; +using Microsoft.Agents.AI.Workflows; + +namespace Microsoft.Agents.AI.DurableTask.Workflows; + +/// +/// Event raised when a durable workflow completes successfully. +/// +[DebuggerDisplay("Completed: {Result}")] +public sealed class DurableWorkflowCompletedEvent : WorkflowEvent +{ + /// + /// Initializes a new instance of the class. + /// + /// The serialized result of the workflow. + public DurableWorkflowCompletedEvent(string? result) : base(result) + { + this.Result = result; + } + + /// + /// Gets the serialized result of the workflow. + /// + public string? Result { get; } +} diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowContext.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowContext.cs new file mode 100644 index 0000000000..9ddb337561 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowContext.cs @@ -0,0 +1,327 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Text.Json; +using Microsoft.Agents.AI.Workflows; + +namespace Microsoft.Agents.AI.DurableTask.Workflows; + +/// +/// A workflow context for durable workflow execution. +/// +/// +/// State is passed in from the orchestration and updates are collected for return. +/// Events emitted during execution are collected and returned to the orchestration +/// as part of the activity output for streaming to callers. +/// +[DebuggerDisplay("Executor = {_executor.Id}, StateEntries = {_initialState.Count}")] +internal sealed class DurableWorkflowContext : IWorkflowContext +{ + /// + /// The default scope name used when no explicit scope is specified. + /// Scopes partition shared state into logical namespaces so that different + /// parts of a workflow can manage their state keys independently. + /// + private const string DefaultScopeName = "__default__"; + + private readonly Dictionary _initialState; + private readonly Executor _executor; + + /// + /// Initializes a new instance of the class. + /// + /// The shared state passed from the orchestration. + /// The executor running in this context. + internal DurableWorkflowContext(Dictionary? initialState, Executor executor) + { + this._executor = executor; + this._initialState = initialState ?? []; + } + + /// + /// Gets the messages sent during activity execution via . + /// + internal List SentMessages { get; } = []; + + /// + /// Gets the outbound events that were added during activity execution. + /// + internal List OutboundEvents { get; } = []; + + /// + /// Gets the state updates made during activity execution. + /// + internal Dictionary StateUpdates { get; } = []; + + /// + /// Gets the scopes that were cleared during activity execution. + /// + internal HashSet ClearedScopes { get; } = []; + + /// + /// Gets a value indicating whether the executor requested a workflow halt. + /// + internal bool HaltRequested { get; private set; } + + /// + public ValueTask AddEventAsync( + WorkflowEvent workflowEvent, + CancellationToken cancellationToken = default) + { + if (workflowEvent is not null) + { + this.OutboundEvents.Add(workflowEvent); + } + + return default; + } + + /// + [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Serializing workflow message types registered at startup.")] + [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Serializing workflow message types registered at startup.")] + public ValueTask SendMessageAsync( + object message, + string? targetId = null, + CancellationToken cancellationToken = default) + { + if (message is not null) + { + Type messageType = message.GetType(); + this.SentMessages.Add(new TypedPayload + { + Data = JsonSerializer.Serialize(message, messageType, DurableSerialization.Options), + TypeName = messageType.AssemblyQualifiedName + }); + } + + return default; + } + + /// + public ValueTask YieldOutputAsync( + object output, + CancellationToken cancellationToken = default) + { + if (output is not null) + { + Type outputType = output.GetType(); + if (!this._executor.CanOutput(outputType)) + { + throw new InvalidOperationException( + $"Cannot output object of type {outputType.Name}. " + + $"Expecting one of [{string.Join(", ", this._executor.OutputTypes)}]."); + } + + this.OutboundEvents.Add(new WorkflowOutputEvent(output, this._executor.Id)); + } + + return default; + } + + /// + public ValueTask RequestHaltAsync() + { + this.HaltRequested = true; + this.OutboundEvents.Add(new DurableHaltRequestedEvent(this._executor.Id)); + return default; + } + + /// + public ValueTask ReadStateAsync( + string key, + string? scopeName = null, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrEmpty(key); + + string scopeKey = GetScopeKey(scopeName, key); + string normalizedScope = scopeName ?? DefaultScopeName; + bool scopeCleared = this.ClearedScopes.Contains(normalizedScope); + + // Local updates take priority over initial state. + if (this.StateUpdates.TryGetValue(scopeKey, out string? updated)) + { + return DeserializeStateAsync(updated); + } + + // If scope was cleared, ignore initial state + if (scopeCleared) + { + return ValueTask.FromResult(default); + } + + // Fall back to initial state passed from orchestration + if (this._initialState.TryGetValue(scopeKey, out string? initial)) + { + return DeserializeStateAsync(initial); + } + + return ValueTask.FromResult(default); + } + + /// + public async ValueTask ReadOrInitStateAsync( + string key, + Func initialStateFactory, + string? scopeName = null, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrEmpty(key); + ArgumentNullException.ThrowIfNull(initialStateFactory); + + // Cannot rely on `value is not null` because T? on an unconstrained generic + // parameter does not become Nullable for value types — the null check is + // always true for types like int. Instead, check key existence directly. + if (this.HasStateKey(key, scopeName)) + { + T? value = await this.ReadStateAsync(key, scopeName, cancellationToken).ConfigureAwait(false); + if (value is not null) + { + return value; + } + } + + T initialValue = initialStateFactory(); + await this.QueueStateUpdateAsync(key, initialValue, scopeName, cancellationToken).ConfigureAwait(false); + return initialValue; + } + + /// + public ValueTask> ReadStateKeysAsync( + string? scopeName = null, + CancellationToken cancellationToken = default) + { + string scopePrefix = GetScopePrefix(scopeName); + int scopePrefixLength = scopePrefix.Length; + HashSet keys = new(StringComparer.Ordinal); + + bool scopeCleared = scopeName is null + ? this.ClearedScopes.Contains(DefaultScopeName) + : this.ClearedScopes.Contains(scopeName); + + // Start with keys from initial state (skip if scope was cleared) + if (!scopeCleared) + { + foreach (string stateKey in this._initialState.Keys) + { + if (stateKey.StartsWith(scopePrefix, StringComparison.Ordinal)) + { + keys.Add(stateKey[scopePrefixLength..]); + } + } + } + + // Merge local updates: add if non-null, remove if null (deleted) + foreach (KeyValuePair update in this.StateUpdates) + { + if (!update.Key.StartsWith(scopePrefix, StringComparison.Ordinal)) + { + continue; + } + + string key = update.Key[scopePrefixLength..]; + if (update.Value is not null) + { + keys.Add(key); + } + else + { + keys.Remove(key); + } + } + + return ValueTask.FromResult(keys); + } + + /// + public ValueTask QueueStateUpdateAsync( + string key, + T? value, + string? scopeName = null, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrEmpty(key); + + string scopeKey = GetScopeKey(scopeName, key); + this.StateUpdates[scopeKey] = value is null ? null : SerializeState(value); + return default; + } + + /// + public ValueTask QueueClearScopeAsync( + string? scopeName = null, + CancellationToken cancellationToken = default) + { + this.ClearedScopes.Add(scopeName ?? DefaultScopeName); + + // Remove any pending updates in this scope (snapshot keys to allow removal during iteration) + string scopePrefix = GetScopePrefix(scopeName); + foreach (string key in this.StateUpdates.Keys.ToList()) + { + if (key.StartsWith(scopePrefix, StringComparison.Ordinal)) + { + this.StateUpdates.Remove(key); + } + } + + return default; + } + + /// + public IReadOnlyDictionary? TraceContext => null; + + /// + public bool ConcurrentRunsEnabled => false; + + private static string GetScopeKey(string? scopeName, string key) + => $"{GetScopePrefix(scopeName)}{key}"; + + /// + /// Checks whether the given key exists in local updates or initial state, + /// respecting cleared scopes. + /// + private bool HasStateKey(string key, string? scopeName) + { + string scopeKey = GetScopeKey(scopeName, key); + + if (this.StateUpdates.TryGetValue(scopeKey, out string? updated)) + { + return updated is not null; + } + + string normalizedScope = scopeName ?? DefaultScopeName; + if (this.ClearedScopes.Contains(normalizedScope)) + { + return false; + } + + return this._initialState.ContainsKey(scopeKey); + } + + /// + /// Returns the key prefix for the given scope. Scopes partition shared state + /// into logical namespaces, allowing different workflow executors to manage + /// their state keys independently. When no scope is specified, the + /// is used. + /// + private static string GetScopePrefix(string? scopeName) + => scopeName is null ? $"{DefaultScopeName}:" : $"{scopeName}:"; + + [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Serializing workflow state types.")] + [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Serializing workflow state types.")] + private static string SerializeState(T value) + => JsonSerializer.Serialize(value, DurableSerialization.Options); + + [UnconditionalSuppressMessage("AOT", "IL3050", Justification = "Deserializing workflow state types.")] + [UnconditionalSuppressMessage("Trimming", "IL2026", Justification = "Deserializing workflow state types.")] + private static ValueTask DeserializeStateAsync(string? json) + { + if (json is null) + { + return ValueTask.FromResult(default); + } + + return ValueTask.FromResult(JsonSerializer.Deserialize(json, DurableSerialization.Options)); + } +} diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowCustomStatus.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowCustomStatus.cs new file mode 100644 index 0000000000..f6d403e861 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowCustomStatus.cs @@ -0,0 +1,22 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.DurableTask.Workflows; + +/// +/// Represents the custom status written by the orchestration for streaming consumption. +/// +/// +/// The Durable Task framework exposes SerializedCustomStatus on orchestration metadata, +/// which is the only orchestration state readable by external clients while the orchestration +/// is still running. The orchestrator writes this object via SetCustomStatus after each +/// superstep so that can poll for new events. +/// On orchestration completion the framework clears custom status, so events are also +/// embedded in the output via . +/// +internal sealed class DurableWorkflowCustomStatus +{ + /// + /// Gets or sets the serialized workflow events emitted so far. + /// + public List Events { get; set; } = []; +} diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowFailedEvent.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowFailedEvent.cs new file mode 100644 index 0000000000..4f1e411be6 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowFailedEvent.cs @@ -0,0 +1,35 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Diagnostics; +using Microsoft.Agents.AI.Workflows; +using Microsoft.DurableTask; + +namespace Microsoft.Agents.AI.DurableTask.Workflows; + +/// +/// Event raised when a durable workflow fails. +/// +[DebuggerDisplay("Failed: {ErrorMessage}")] +public sealed class DurableWorkflowFailedEvent : WorkflowEvent +{ + /// + /// Initializes a new instance of the class. + /// + /// The error message describing the failure. + /// The full failure details from the Durable Task runtime, if available. + public DurableWorkflowFailedEvent(string errorMessage, TaskFailureDetails? failureDetails = null) : base(errorMessage) + { + this.ErrorMessage = errorMessage; + this.FailureDetails = failureDetails; + } + + /// + /// Gets the error message describing the failure. + /// + public string ErrorMessage { get; } + + /// + /// Gets the full failure details from the Durable Task runtime, including error type, stack trace, and inner failure. + /// + public TaskFailureDetails? FailureDetails { get; } +} diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowJsonContext.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowJsonContext.cs index 3e89f6ca79..9058c41e0a 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowJsonContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowJsonContext.cs @@ -14,8 +14,9 @@ namespace Microsoft.Agents.AI.DurableTask.Workflows; /// /// /// : Activity input wrapper with state -/// : Activity output wrapper with results and events -/// : Messages sent via SendMessageAsync +/// : Executor output wrapper with results, events, and state updates +/// : Serialized payload wrapper with type info (events and messages) +/// : Custom status for streaming consumption /// /// /// Note: User-defined executor input/output types still use reflection-based serialization @@ -27,8 +28,12 @@ namespace Microsoft.Agents.AI.DurableTask.Workflows; DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase)] [JsonSerializable(typeof(DurableActivityInput))] -[JsonSerializable(typeof(DurableActivityOutput))] -[JsonSerializable(typeof(SentMessageInfo))] -[JsonSerializable(typeof(List))] +[JsonSerializable(typeof(DurableExecutorOutput))] +[JsonSerializable(typeof(TypedPayload))] +[JsonSerializable(typeof(List))] +[JsonSerializable(typeof(DurableWorkflowCustomStatus))] +[JsonSerializable(typeof(DurableWorkflowResult))] +[JsonSerializable(typeof(List))] +[JsonSerializable(typeof(Dictionary))] [JsonSerializable(typeof(Dictionary))] internal partial class DurableWorkflowJsonContext : JsonSerializerContext; diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowResult.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowResult.cs new file mode 100644 index 0000000000..933fd74c62 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowResult.cs @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.DurableTask.Workflows; + +/// +/// Wraps the orchestration output to include both the workflow result and accumulated events. +/// +/// +/// The Durable Task framework clears SerializedCustomStatus when an orchestration +/// completes. To ensure streaming clients can retrieve events even after completion, +/// the accumulated events are embedded in the orchestration output alongside the result. +/// +internal sealed class DurableWorkflowResult +{ + /// + /// Gets or sets the serialized result of the workflow execution. + /// + public string? Result { get; set; } + + /// + /// Gets or sets the serialized workflow events emitted during execution. + /// + public List Events { get; set; } = []; +} diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRun.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRun.cs index 8a9dbe7f6c..aeb42f4fb6 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRun.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRun.cs @@ -55,7 +55,7 @@ internal DurableWorkflowRun(DurableTaskClient client, string instanceId, string if (metadata.RuntimeStatus == OrchestrationRuntimeStatus.Completed) { - return metadata.ReadOutputAs(); + return DurableStreamingWorkflowRun.ExtractResult(metadata.SerializedOutput); } if (metadata.RuntimeStatus == OrchestrationRuntimeStatus.Failed) diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs index ad49acd0b9..d133d16919 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/DurableWorkflowRunner.cs @@ -171,25 +171,42 @@ private static async Task RunSuperstepLoopAsync( logger.LogSuperstepExecutors(superstep, string.Join(", ", executorInputs.Select(e => e.ExecutorId))); } - string[] results = await DispatchExecutorsInParallelAsync(context, executorInputs, logger).ConfigureAwait(true); + string[] results = await DispatchExecutorsInParallelAsync(context, executorInputs, state.SharedState, logger).ConfigureAwait(true); - ProcessSuperstepResults(executorInputs, results, state, logger); + bool haltRequested = ProcessSuperstepResults(executorInputs, results, state, context, logger); + + if (haltRequested) + { + break; + } // Check if we've reached the limit and still have work remaining - if (superstep == MaxSupersteps) + int remainingExecutors = CountRemainingExecutors(state.MessageQueues); + if (superstep == MaxSupersteps && remainingExecutors > 0) { - int remainingExecutors = CountRemainingExecutors(state.MessageQueues); - if (remainingExecutors > 0) - { - logger.LogWorkflowMaxSuperstepsExceeded(context.InstanceId, MaxSupersteps, remainingExecutors); - } + logger.LogWorkflowMaxSuperstepsExceeded(context.InstanceId, MaxSupersteps, remainingExecutors); } } + // Publish final events for live streaming (skip during replay) + if (!context.IsReplaying) + { + PublishEventsToCustomStatus(context, state); + } + string finalResult = GetFinalResult(state.LastResults); logger.LogWorkflowCompleted(); - return finalResult; + // Return wrapper with both result and events so streaming clients can + // retrieve events from SerializedOutput after the orchestration completes + // (SerializedCustomStatus is cleared by the framework on completion). + DurableWorkflowResult workflowResult = new() + { + Result = finalResult, + Events = state.AccumulatedEvents + }; + + return JsonSerializer.Serialize(workflowResult, DurableWorkflowJsonContext.Default.DurableWorkflowResult); } /// @@ -203,10 +220,11 @@ private static int CountRemainingExecutors(Dictionary DispatchExecutorsInParallelAsync( TaskOrchestrationContext context, List executorInputs, + Dictionary sharedState, ILogger logger) { Task[] dispatchTasks = executorInputs - .Select(input => DurableExecutorDispatcher.DispatchAsync(context, input.Info, input.Envelope, logger)) + .Select(input => DurableExecutorDispatcher.DispatchAsync(context, input.Info, input.Envelope, sharedState, logger)) .ToArray(); return await Task.WhenAll(dispatchTasks).ConfigureAwait(true); @@ -242,6 +260,16 @@ public SuperstepState(Workflow workflow, DurableEdgeMap edgeMap) public Dictionary> MessageQueues { get; } = []; public Dictionary LastResults { get; } = []; + + /// + /// Shared state dictionary across supersteps (scope-prefixed key -> serialized value). + /// + public Dictionary SharedState { get; } = []; + + /// + /// Accumulated workflow events for custom status (streaming consumption). + /// + public List AccumulatedEvents { get; } = []; } /// @@ -322,22 +350,132 @@ private static DurableMessageEnvelope AggregateQueueMessages( /// /// Processes results from a superstep, updating state and routing messages to successors. /// - private static void ProcessSuperstepResults( + /// true if a halt was requested by any executor; otherwise, false. + private static bool ProcessSuperstepResults( List inputs, string[] rawResults, SuperstepState state, + TaskOrchestrationContext context, ILogger logger) { + bool haltRequested = false; + for (int i = 0; i < inputs.Count; i++) { string executorId = inputs[i].ExecutorId; - (string result, List sentMessages) = ParseActivityResult(rawResults[i]); + ExecutorResultInfo resultInfo = ParseActivityResult(rawResults[i]); + + logger.LogExecutorResultReceived(executorId, resultInfo.Result.Length, resultInfo.SentMessages.Count); + + state.LastResults[executorId] = resultInfo.Result; + + // Merge state updates from activity into shared state + MergeStateUpdates(state, resultInfo.StateUpdates, resultInfo.ClearedScopes); + + // Accumulate events for custom status (streaming) + state.AccumulatedEvents.AddRange(resultInfo.Events); - logger.LogExecutorResultReceived(executorId, result.Length, sentMessages.Count); + // Check for halt request + haltRequested |= resultInfo.HaltRequested; - state.LastResults[executorId] = result; - RouteOutputToSuccessors(executorId, result, sentMessages, state, logger); + // Publish events for live streaming (skip during replay) + if (!context.IsReplaying) + { + PublishEventsToCustomStatus(context, state); + } + + RouteOutputToSuccessors(executorId, resultInfo.Result, resultInfo.SentMessages, state, logger); } + + return haltRequested; + } + + /// + /// Merges state updates from an executor into the shared state. + /// + /// + /// When concurrent executors in the same superstep modify keys in the same scope, + /// last-write-wins semantics apply. + /// + private static void MergeStateUpdates( + SuperstepState state, + Dictionary stateUpdates, + List clearedScopes) + { + Dictionary shared = state.SharedState; + + ApplyClearedScopes(shared, clearedScopes); + + // Apply individual state updates + foreach ((string key, string? value) in stateUpdates) + { + if (value is null) + { + shared.Remove(key); + } + else + { + shared[key] = value; + } + } + } + + /// + /// Removes all keys belonging to the specified scopes from the shared state dictionary. + /// + private static void ApplyClearedScopes(Dictionary shared, List clearedScopes) + { + if (clearedScopes.Count == 0 || shared.Count == 0) + { + return; + } + + List keysToRemove = []; + + foreach (string clearedScope in clearedScopes) + { + string scopePrefix = string.Concat(clearedScope, ":"); + keysToRemove.Clear(); + + foreach (string key in shared.Keys) + { + if (key.StartsWith(scopePrefix, StringComparison.Ordinal)) + { + keysToRemove.Add(key); + } + } + + foreach (string key in keysToRemove) + { + shared.Remove(key); + } + + if (shared.Count == 0) + { + break; + } + } + } + + /// + /// Publishes accumulated workflow events to the orchestration's custom status, + /// making them available to for live streaming. + /// + /// + /// Custom status is the only orchestration metadata readable by external clients while + /// the orchestration is still running. It is cleared by the framework on completion, + /// so events are also included in for final retrieval. + /// + private static void PublishEventsToCustomStatus(TaskOrchestrationContext context, SuperstepState state) + { + DurableWorkflowCustomStatus customStatus = new() + { + Events = state.AccumulatedEvents + }; + + // Pass the object directly — the framework's DataConverter handles serialization. + // Pre-serializing would cause double-serialization (string wrapped in JSON quotes). + context.SetCustomStatus(customStatus); } /// @@ -346,16 +484,16 @@ private static void ProcessSuperstepResults( private static void RouteOutputToSuccessors( string executorId, string result, - List sentMessages, + List sentMessages, SuperstepState state, ILogger logger) { if (sentMessages.Count > 0) { // Only route messages that have content - foreach (SentMessageInfo message in sentMessages.Where(m => !string.IsNullOrEmpty(m.Message))) + foreach (TypedPayload message in sentMessages.Where(m => !string.IsNullOrEmpty(m.Data))) { - state.EdgeMap.RouteMessage(executorId, message.Message!, message.TypeName, state.MessageQueues, logger); + state.EdgeMap.RouteMessage(executorId, message.Data!, message.TypeName, state.MessageQueues, logger); } return; @@ -406,31 +544,49 @@ private static string GetFinalResult(Dictionary lastResults) } /// - /// Parses the raw activity result to extract the result string and any sent messages. + /// Output from an executor invocation, including its result, + /// messages, state updates, and emitted workflow events. + /// + private sealed record ExecutorResultInfo( + string Result, + List SentMessages, + Dictionary StateUpdates, + List ClearedScopes, + List Events, + bool HaltRequested); + + /// + /// Parses the raw activity result to extract result, messages, events, and state updates. /// - private static (string Result, List SentMessages) ParseActivityResult(string rawResult) + private static ExecutorResultInfo ParseActivityResult(string rawResult) { if (string.IsNullOrEmpty(rawResult)) { - return (rawResult, []); + return new ExecutorResultInfo(rawResult, [], [], [], [], false); } try { - DurableActivityOutput? output = JsonSerializer.Deserialize( + DurableExecutorOutput? output = JsonSerializer.Deserialize( rawResult, - DurableWorkflowJsonContext.Default.DurableActivityOutput); + DurableWorkflowJsonContext.Default.DurableExecutorOutput); if (output is null || !HasMeaningfulContent(output)) { - return (rawResult, []); + return new ExecutorResultInfo(rawResult, [], [], [], [], false); } - return (output.Result ?? string.Empty, output.SentMessages); + return new ExecutorResultInfo( + output.Result ?? string.Empty, + output.SentMessages, + output.StateUpdates, + output.ClearedScopes, + output.Events, + output.HaltRequested); } catch (JsonException) { - return (rawResult, []); + return new ExecutorResultInfo(rawResult, [], [], [], [], false); } } @@ -441,8 +597,13 @@ private static (string Result, List SentMessages) ParseActivity /// Distinguishes actual activity output from arbitrary JSON that deserialized /// successfully but with all default/empty values. /// - private static bool HasMeaningfulContent(DurableActivityOutput output) + private static bool HasMeaningfulContent(DurableExecutorOutput output) { - return output.Result is not null || output.SentMessages.Count > 0; + return output.Result is not null + || output.SentMessages.Count > 0 + || output.Events.Count > 0 + || output.StateUpdates.Count > 0 + || output.ClearedScopes.Count > 0 + || output.HaltRequested; } } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableDirectEdgeRouter.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableDirectEdgeRouter.cs index 649d6eb676..3f78093183 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableDirectEdgeRouter.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/EdgeRouters/DurableDirectEdgeRouter.cs @@ -136,8 +136,8 @@ public void RouteMessage( // so the condition function can access strongly-typed properties. // Otherwise, deserialize as a generic object for basic inspection. return targetType is null - ? JsonSerializer.Deserialize(json) - : JsonSerializer.Deserialize(json, targetType); + ? JsonSerializer.Deserialize(json, DurableSerialization.Options) + : JsonSerializer.Deserialize(json, targetType, DurableSerialization.Options); } private static void EnqueueMessage( diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/IStreamingWorkflowRun.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/IStreamingWorkflowRun.cs new file mode 100644 index 0000000000..e34e9b39d1 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/IStreamingWorkflowRun.cs @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Microsoft.Agents.AI.Workflows; + +namespace Microsoft.Agents.AI.DurableTask.Workflows; + +/// +/// Represents a workflow run that supports streaming workflow events as they occur. +/// +/// +/// This interface defines the contract for streaming workflow runs in durable execution +/// environments. Implementations provide real-time access to workflow events. +/// +public interface IStreamingWorkflowRun +{ + /// + /// Gets the unique identifier for the run. + /// + /// + /// This identifier can be provided at the start of the run, or auto-generated. + /// For durable runs, this corresponds to the orchestration instance ID. + /// + string RunId { get; } + + /// + /// Asynchronously streams workflow events as they occur during workflow execution. + /// + /// + /// This method yields instances in real time as the workflow + /// progresses. The stream completes when the workflow completes, fails, or is terminated. + /// Events are delivered in the order they are raised. + /// + /// + /// A that can be used to cancel the streaming operation. + /// If cancellation is requested, the stream will end and no further events will be yielded. + /// + /// + /// An asynchronous stream of objects representing significant + /// workflow state changes. + /// + IAsyncEnumerable WatchStreamAsync(CancellationToken cancellationToken = default); +} diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/IWorkflowClient.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/IWorkflowClient.cs index 488a0ba2d4..e84f3fe4cd 100644 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/IWorkflowClient.cs +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/IWorkflowClient.cs @@ -38,4 +38,34 @@ ValueTask RunAsync( string input, string? runId = null, CancellationToken cancellationToken = default); + + /// + /// Starts a workflow and returns a streaming handle to watch events in real-time. + /// + /// The type of the input to the workflow. + /// The workflow to execute. + /// The input to pass to the workflow's starting executor. + /// Optional identifier for the run. If not provided, a new ID will be generated. + /// A cancellation token to observe. + /// An that can be used to stream workflow events. + ValueTask StreamAsync( + Workflow workflow, + TInput input, + string? runId = null, + CancellationToken cancellationToken = default) + where TInput : notnull; + + /// + /// Starts a workflow with string input and returns a streaming handle to watch events in real-time. + /// + /// The workflow to execute. + /// The string input to pass to the workflow. + /// Optional identifier for the run. If not provided, a new ID will be generated. + /// A cancellation token to observe. + /// An that can be used to stream workflow events. + ValueTask StreamAsync( + Workflow workflow, + string input, + string? runId = null, + CancellationToken cancellationToken = default); } diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/SentMessageInfo.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/SentMessageInfo.cs deleted file mode 100644 index 46e52e74e4..0000000000 --- a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/SentMessageInfo.cs +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. - -using Microsoft.Agents.AI.Workflows; - -namespace Microsoft.Agents.AI.DurableTask.Workflows; - -/// -/// Information about a message sent via . -/// -internal sealed class SentMessageInfo -{ - /// - /// Gets or sets the serialized message content. - /// - public string? Message { get; set; } - - /// - /// Gets or sets the full type name of the message. - /// - public string? TypeName { get; set; } -} diff --git a/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/TypedPayload.cs b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/TypedPayload.cs new file mode 100644 index 0000000000..7c0998585a --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.DurableTask/Workflows/TypedPayload.cs @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft. All rights reserved. + +namespace Microsoft.Agents.AI.DurableTask.Workflows; + +/// +/// Pairs a JSON-serialized payload with its assembly-qualified type name +/// for type-safe deserialization across activity boundaries. +/// +internal sealed class TypedPayload +{ + /// + /// Gets or sets the assembly-qualified type name of the payload. + /// + public string? TypeName { get; set; } + + /// + /// Gets or sets the serialized payload data as JSON. + /// + public string? Data { get; set; } +} diff --git a/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/BuiltInFunctions.cs b/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/BuiltInFunctions.cs index 9ceb59dd70..97c6bbcaeb 100644 --- a/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/BuiltInFunctions.cs +++ b/dotnet/src/Microsoft.Agents.AI.Hosting.AzureFunctions/BuiltInFunctions.cs @@ -48,9 +48,7 @@ public static async Task RunWorkflowOrchestrationHttpTriggerAs if (string.IsNullOrEmpty(inputMessage)) { - HttpResponseData errorResponse = req.CreateResponse(HttpStatusCode.BadRequest); - await errorResponse.WriteStringAsync("Workflow input cannot be empty."); - return errorResponse; + return await CreateErrorResponseAsync(req, context, HttpStatusCode.BadRequest, "Workflow input cannot be empty."); } DurableWorkflowInput orchestrationInput = new() { Input = inputMessage }; diff --git a/dotnet/tests/Microsoft.Agents.AI.DurableTask.IntegrationTests/WorkflowConsoleAppSamplesValidation.cs b/dotnet/tests/Microsoft.Agents.AI.DurableTask.IntegrationTests/WorkflowConsoleAppSamplesValidation.cs index 0009e7b762..436e9cbc45 100644 --- a/dotnet/tests/Microsoft.Agents.AI.DurableTask.IntegrationTests/WorkflowConsoleAppSamplesValidation.cs +++ b/dotnet/tests/Microsoft.Agents.AI.DurableTask.IntegrationTests/WorkflowConsoleAppSamplesValidation.cs @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft. All rights reserved. +// Copyright (c) Microsoft. All rights reserved. using Xunit.Abstractions; @@ -181,6 +181,200 @@ private void AssertNoError(string line) } } + [Fact] + public async Task WorkflowEventsSampleValidationAsync() + { + using CancellationTokenSource testTimeoutCts = this.CreateTestTimeoutCts(); + string samplePath = Path.Combine(s_samplesPath, "05_WorkflowEvents"); + + await this.RunSampleTestAsync(samplePath, async (process, logs) => + { + bool inputSent = false; + bool foundStartedRun = false; + bool foundExecutorInvoked = false; + bool foundExecutorCompleted = false; + bool foundLookupStarted = false; + bool foundOrderFound = false; + bool foundCancelProgress = false; + bool foundOrderCancelled = false; + bool foundEmailSent = false; + bool foundYieldedOutput = false; + bool foundWorkflowCompleted = false; + bool foundCompletionResult = false; + List eventLines = []; + + string? line; + while ((line = this.ReadLogLine(logs, testTimeoutCts.Token)) != null) + { + if (!inputSent && line.Contains("Enter order ID", StringComparison.OrdinalIgnoreCase)) + { + await this.WriteInputAsync(process, "12345", testTimeoutCts.Token); + inputSent = true; + } + + if (inputSent) + { + foundStartedRun |= line.Contains("Started run:", StringComparison.Ordinal); + foundExecutorInvoked |= line.Contains("ExecutorInvokedEvent", StringComparison.Ordinal); + foundExecutorCompleted |= line.Contains("ExecutorCompletedEvent", StringComparison.Ordinal); + foundLookupStarted |= line.Contains("[Lookup] Looking up order", StringComparison.Ordinal); + foundOrderFound |= line.Contains("[Lookup] Found:", StringComparison.Ordinal); + foundCancelProgress |= line.Contains("[Cancel]", StringComparison.Ordinal) && line.Contains('%'); + foundOrderCancelled |= line.Contains("[Cancel] Done", StringComparison.Ordinal); + foundEmailSent |= line.Contains("[Email] Sent to", StringComparison.Ordinal); + foundYieldedOutput |= line.Contains("[Output]", StringComparison.Ordinal); + foundWorkflowCompleted |= line.Contains("DurableWorkflowCompletedEvent", StringComparison.Ordinal); + + if (line.Contains("Completed:", StringComparison.Ordinal)) + { + foundCompletionResult = line.Contains("12345", StringComparison.Ordinal); + break; + } + + // Collect event lines for ordering verification + if (line.Contains("[Lookup]", StringComparison.Ordinal) + || line.Contains("[Cancel]", StringComparison.Ordinal) + || line.Contains("[Email]", StringComparison.Ordinal) + || line.Contains("[Output]", StringComparison.Ordinal)) + { + eventLines.Add(line); + } + } + + this.AssertNoError(line); + } + + Assert.True(inputSent, "Input was not sent to the workflow."); + Assert.True(foundStartedRun, "Streaming run was not started."); + Assert.True(foundExecutorInvoked, "ExecutorInvokedEvent not found in stream."); + Assert.True(foundExecutorCompleted, "ExecutorCompletedEvent not found in stream."); + Assert.True(foundLookupStarted, "OrderLookupStartedEvent not found in stream."); + Assert.True(foundOrderFound, "OrderFoundEvent not found in stream."); + Assert.True(foundCancelProgress, "CancellationProgressEvent not found in stream."); + Assert.True(foundOrderCancelled, "OrderCancelledEvent not found in stream."); + Assert.True(foundEmailSent, "EmailSentEvent not found in stream."); + Assert.True(foundYieldedOutput, "WorkflowOutputEvent not found in stream."); + Assert.True(foundWorkflowCompleted, "DurableWorkflowCompletedEvent not found in stream."); + Assert.True(foundCompletionResult, "Completion result does not contain the order ID."); + + // Verify event ordering: lookup events appear before cancel events, which appear before email events + int lastLookupIndex = eventLines.FindLastIndex(l => l.Contains("[Lookup]", StringComparison.Ordinal)); + int firstCancelIndex = eventLines.FindIndex(l => l.Contains("[Cancel]", StringComparison.Ordinal)); + int lastCancelIndex = eventLines.FindLastIndex(l => l.Contains("[Cancel]", StringComparison.Ordinal)); + int firstEmailIndex = eventLines.FindIndex(l => l.Contains("[Email]", StringComparison.Ordinal)); + + if (lastLookupIndex >= 0 && firstCancelIndex >= 0) + { + Assert.True(lastLookupIndex < firstCancelIndex, "Lookup events should appear before cancel events."); + } + + if (lastCancelIndex >= 0 && firstEmailIndex >= 0) + { + Assert.True(lastCancelIndex < firstEmailIndex, "Cancel events should appear before email events."); + } + + await this.WriteInputAsync(process, "exit", testTimeoutCts.Token); + }); + } + + [Fact] + public async Task WorkflowSharedStateSampleValidationAsync() + { + using CancellationTokenSource testTimeoutCts = this.CreateTestTimeoutCts(); + string samplePath = Path.Combine(s_samplesPath, "06_WorkflowSharedState"); + + await this.RunSampleTestAsync(samplePath, async (process, logs) => + { + bool inputSent = false; + bool foundStartedRun = false; + bool foundValidateOutput = false; + bool foundEnrichOutput = false; + bool foundPaymentOutput = false; + bool foundInvoiceOutput = false; + bool foundTaxCalculation = false; + bool foundAuditTrail = false; + bool foundWorkflowCompleted = false; + List outputLines = []; + + string? line; + while ((line = this.ReadLogLine(logs, testTimeoutCts.Token)) != null) + { + if (!inputSent && line.Contains("Enter an order ID", StringComparison.OrdinalIgnoreCase)) + { + await this.WriteInputAsync(process, "ORD-001", testTimeoutCts.Token); + inputSent = true; + } + + if (inputSent) + { + foundStartedRun |= line.Contains("Started run:", StringComparison.Ordinal); + + if (line.Contains("[Output]", StringComparison.Ordinal)) + { + foundValidateOutput |= line.Contains("ValidateOrder:", StringComparison.Ordinal) && line.Contains("validated", StringComparison.OrdinalIgnoreCase); + foundEnrichOutput |= line.Contains("EnrichOrder:", StringComparison.Ordinal) && line.Contains("enriched", StringComparison.OrdinalIgnoreCase); + foundPaymentOutput |= line.Contains("ProcessPayment:", StringComparison.Ordinal) && line.Contains("Payment processed", StringComparison.OrdinalIgnoreCase); + foundInvoiceOutput |= line.Contains("GenerateInvoice:", StringComparison.Ordinal) && line.Contains("Invoice complete", StringComparison.OrdinalIgnoreCase); + + // Verify shared state: tax rate was read by ProcessPayment + foundTaxCalculation |= line.Contains("tax:", StringComparison.OrdinalIgnoreCase); + + // Verify shared state: audit trail was accumulated across executors + foundAuditTrail |= line.Contains("Audit trail:", StringComparison.Ordinal) + && line.Contains("ValidateOrder", StringComparison.Ordinal) + && line.Contains("EnrichOrder", StringComparison.Ordinal) + && line.Contains("ProcessPayment", StringComparison.Ordinal); + + outputLines.Add(line); + } + + foundWorkflowCompleted |= line.Contains("DurableWorkflowCompletedEvent", StringComparison.Ordinal) + || line.Contains("Completed:", StringComparison.Ordinal); + + if (line.Contains("Completed:", StringComparison.Ordinal)) + { + break; + } + } + + this.AssertNoError(line); + } + + Assert.True(inputSent, "Input was not sent to the workflow."); + Assert.True(foundStartedRun, "Streaming run was not started."); + Assert.True(foundValidateOutput, "ValidateOrder output not found in stream."); + Assert.True(foundEnrichOutput, "EnrichOrder output not found in stream."); + Assert.True(foundPaymentOutput, "ProcessPayment output not found in stream."); + Assert.True(foundInvoiceOutput, "GenerateInvoice output not found in stream."); + Assert.True(foundTaxCalculation, "Tax calculation (shared state read) not found."); + Assert.True(foundAuditTrail, "Audit trail (shared state accumulation) not found."); + Assert.True(foundWorkflowCompleted, "Workflow completion not found in stream."); + + // Verify output ordering: ValidateOrder -> EnrichOrder -> ProcessPayment -> GenerateInvoice + int validateIndex = outputLines.FindIndex(l => l.Contains("ValidateOrder:", StringComparison.Ordinal) && l.Contains("validated", StringComparison.OrdinalIgnoreCase)); + int enrichIndex = outputLines.FindIndex(l => l.Contains("EnrichOrder:", StringComparison.Ordinal)); + int paymentIndex = outputLines.FindIndex(l => l.Contains("ProcessPayment:", StringComparison.Ordinal)); + int invoiceIndex = outputLines.FindIndex(l => l.Contains("GenerateInvoice:", StringComparison.Ordinal)); + + if (validateIndex >= 0 && enrichIndex >= 0) + { + Assert.True(validateIndex < enrichIndex, "ValidateOrder output should appear before EnrichOrder."); + } + + if (enrichIndex >= 0 && paymentIndex >= 0) + { + Assert.True(enrichIndex < paymentIndex, "EnrichOrder output should appear before ProcessPayment."); + } + + if (paymentIndex >= 0 && invoiceIndex >= 0) + { + Assert.True(paymentIndex < invoiceIndex, "ProcessPayment output should appear before GenerateInvoice."); + } + + await this.WriteInputAsync(process, "exit", testTimeoutCts.Token); + }); + } + [Fact] public async Task WorkflowAndAgentsSampleValidationAsync() { diff --git a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableStreamingWorkflowRunTests.cs b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableStreamingWorkflowRunTests.cs new file mode 100644 index 0000000000..ee91a33a13 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableStreamingWorkflowRunTests.cs @@ -0,0 +1,625 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Text.Json; +using Microsoft.Agents.AI.DurableTask.Workflows; +using Microsoft.Agents.AI.Workflows; +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Moq; + +namespace Microsoft.Agents.AI.DurableTask.UnitTests.Workflows; + +public sealed class DurableStreamingWorkflowRunTests +{ + private const string InstanceId = "test-instance-123"; + private const string WorkflowTestName = "TestWorkflow"; + + private static Workflow CreateTestWorkflow() => + new WorkflowBuilder(new FunctionExecutor("start", (_, _, _) => default)) + .WithName(WorkflowTestName) + .Build(); + + private static OrchestrationMetadata CreateMetadata( + OrchestrationRuntimeStatus status, + string? serializedCustomStatus = null, + string? serializedOutput = null, + TaskFailureDetails? failureDetails = null) + { + return new OrchestrationMetadata(WorkflowTestName, InstanceId) + { + RuntimeStatus = status, + SerializedCustomStatus = serializedCustomStatus, + SerializedOutput = serializedOutput, + FailureDetails = failureDetails, + }; + } + + private static string SerializeCustomStatus(List events) + { + DurableWorkflowCustomStatus status = new() { Events = events }; + return JsonSerializer.Serialize(status, DurableWorkflowJsonContext.Default.DurableWorkflowCustomStatus); + } + + private static string SerializeWorkflowResult(string? result, List events) + { + DurableWorkflowResult workflowResult = new() { Result = result, Events = events }; + string inner = JsonSerializer.Serialize(workflowResult, DurableWorkflowJsonContext.Default.DurableWorkflowResult); + return JsonSerializer.Serialize(inner); + } + + private static string SerializeEvent(WorkflowEvent evt) + { + Type eventType = evt.GetType(); + TypedPayload wrapper = new() + { + TypeName = eventType.AssemblyQualifiedName, + Data = JsonSerializer.Serialize(evt, eventType, DurableSerialization.Options) + }; + + return JsonSerializer.Serialize(wrapper, DurableWorkflowJsonContext.Default.TypedPayload); + } + + #region Constructor and Properties + + [Fact] + public void Constructor_SetsRunIdAndWorkflowName() + { + // Arrange + Mock mockClient = new("test"); + + // Act + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Assert + Assert.Equal(InstanceId, run.RunId); + Assert.Equal(WorkflowTestName, run.WorkflowName); + } + + [Fact] + public void Constructor_NoWorkflowName_SetsEmptyString() + { + // Arrange + Mock mockClient = new("test"); + Workflow workflow = new WorkflowBuilder(new FunctionExecutor("start", (_, _, _) => default)).Build(); + + // Act + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, workflow); + + // Assert + Assert.Equal(string.Empty, run.WorkflowName); + } + + #endregion + + #region GetStatusAsync + + [Theory] + [InlineData(OrchestrationRuntimeStatus.Pending, DurableRunStatus.Pending)] + [InlineData(OrchestrationRuntimeStatus.Running, DurableRunStatus.Running)] + [InlineData(OrchestrationRuntimeStatus.Completed, DurableRunStatus.Completed)] + [InlineData(OrchestrationRuntimeStatus.Failed, DurableRunStatus.Failed)] + [InlineData(OrchestrationRuntimeStatus.Terminated, DurableRunStatus.Terminated)] + [InlineData(OrchestrationRuntimeStatus.Suspended, DurableRunStatus.Suspended)] + + public async Task GetStatusAsync_MapsRuntimeStatusCorrectlyAsync( + OrchestrationRuntimeStatus runtimeStatus, + DurableRunStatus expectedStatus) + { + // Arrange + Mock mockClient = new("test"); + mockClient.Setup(c => c.GetInstanceAsync(InstanceId, false, It.IsAny())) + .ReturnsAsync(CreateMetadata(runtimeStatus)); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act + DurableRunStatus status = await run.GetStatusAsync(); + + // Assert + Assert.Equal(expectedStatus, status); + } + + [Fact] + public async Task GetStatusAsync_InstanceNotFound_ReturnsNotFoundAsync() + { + // Arrange + Mock mockClient = new("test"); + mockClient.Setup(c => c.GetInstanceAsync(InstanceId, false, It.IsAny())) + .ReturnsAsync((OrchestrationMetadata?)null); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act + DurableRunStatus status = await run.GetStatusAsync(); + + // Assert + Assert.Equal(DurableRunStatus.NotFound, status); + } + + #endregion + + #region WatchStreamAsync + + [Fact] + public async Task WatchStreamAsync_InstanceNotFound_YieldsNoEventsAsync() + { + // Arrange + Mock mockClient = new("test"); + mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync((OrchestrationMetadata?)null); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + Assert.Empty(events); + } + + [Fact] + public async Task WatchStreamAsync_CompletedWithResult_YieldsCompletedEventAsync() + { + // Arrange + string serializedOutput = SerializeWorkflowResult("done", []); + Mock mockClient = new("test"); + mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync(CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput)); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + Assert.Single(events); + DurableWorkflowCompletedEvent completedEvent = Assert.IsType(events[0]); + Assert.Equal("done", completedEvent.Data); + } + + [Fact] + public async Task WatchStreamAsync_CompletedWithEventsInOutput_YieldsEventsAndCompletionAsync() + { + // Arrange + DurableHaltRequestedEvent haltEvent = new("executor-1"); + string serializedEvent = SerializeEvent(haltEvent); + string serializedOutput = SerializeWorkflowResult("result", [serializedEvent]); + + Mock mockClient = new("test"); + mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync(CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput)); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + Assert.Equal(2, events.Count); + DurableHaltRequestedEvent haltResult = Assert.IsType(events[0]); + Assert.Equal("executor-1", haltResult.ExecutorId); + DurableWorkflowCompletedEvent completedResult = Assert.IsType(events[1]); + Assert.Equal("result", completedResult.Result); + } + + [Fact] + public async Task WatchStreamAsync_CompletedWithoutWrapper_YieldsFailedEventAsync() + { + // Arrange — output not wrapped in DurableWorkflowResult (indicates a bug) + Mock mockClient = new("test"); + mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync(CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: "\"raw output\"")); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert — yields a failed event with diagnostic message instead of crashing + Assert.Single(events); + DurableWorkflowFailedEvent failedEvent = Assert.IsType(events[0]); + Assert.Contains("could not be parsed", failedEvent.ErrorMessage); + } + + [Fact] + public async Task WatchStreamAsync_Failed_YieldsFailedEventAsync() + { + // Arrange + Mock mockClient = new("test"); + TaskFailureDetails failureDetails = new("ErrorType", "Something went wrong", null, null, null); + mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync(CreateMetadata( + OrchestrationRuntimeStatus.Failed, + failureDetails: failureDetails)); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + Assert.Single(events); + DurableWorkflowFailedEvent failedEvent = Assert.IsType(events[0]); + Assert.Equal("Something went wrong", failedEvent.ErrorMessage); + Assert.NotNull(failedEvent.FailureDetails); + Assert.Equal("ErrorType", failedEvent.FailureDetails.ErrorType); + Assert.Equal("Something went wrong", failedEvent.FailureDetails.ErrorMessage); + } + + [Fact] + public async Task WatchStreamAsync_FailedWithNoDetails_YieldsDefaultMessageAsync() + { + // Arrange + Mock mockClient = new("test"); + mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync(CreateMetadata(OrchestrationRuntimeStatus.Failed)); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + Assert.Single(events); + DurableWorkflowFailedEvent failedEvent = Assert.IsType(events[0]); + Assert.Equal("Workflow execution failed.", failedEvent.ErrorMessage); + Assert.Null(failedEvent.FailureDetails); + } + + [Fact] + public async Task WatchStreamAsync_Terminated_YieldsFailedEventAsync() + { + // Arrange + Mock mockClient = new("test"); + mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync(CreateMetadata(OrchestrationRuntimeStatus.Terminated)); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + Assert.Single(events); + DurableWorkflowFailedEvent failedEvent = Assert.IsType(events[0]); + Assert.Equal("Workflow was terminated.", failedEvent.ErrorMessage); + Assert.Null(failedEvent.FailureDetails); + } + + [Fact] + public async Task WatchStreamAsync_EventsInCustomStatus_YieldsEventsBeforeCompletionAsync() + { + // Arrange + DurableHaltRequestedEvent haltEvent = new("exec-1"); + string serializedEvent = SerializeEvent(haltEvent); + string customStatus = SerializeCustomStatus([serializedEvent]); + string serializedOutput = SerializeWorkflowResult("final", []); + + int callCount = 0; + Mock mockClient = new("test"); + mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync(() => + { + callCount++; + if (callCount == 1) + { + return CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: customStatus); + } + + return CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput); + }); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert + Assert.Equal(2, events.Count); + DurableHaltRequestedEvent haltResult = Assert.IsType(events[0]); + Assert.Equal("exec-1", haltResult.ExecutorId); + DurableWorkflowCompletedEvent completedResult = Assert.IsType(events[1]); + Assert.Equal("final", completedResult.Result); + } + + [Fact] + public async Task WatchStreamAsync_IncrementalEvents_YieldsOnlyNewEventsPerPollAsync() + { + // Arrange — simulate 3 poll cycles where events accumulate in custom status, + // then a final completion poll. This validates: + // 1. Events arriving across multiple poll cycles are yielded incrementally + // 2. Already-seen events are not re-yielded (lastReadEventIndex dedup) + // 3. Completion event follows all streamed events + DurableHaltRequestedEvent event1 = new("executor-1"); + DurableHaltRequestedEvent event2 = new("executor-2"); + DurableHaltRequestedEvent event3 = new("executor-3"); + + string serializedEvent1 = SerializeEvent(event1); + string serializedEvent2 = SerializeEvent(event2); + string serializedEvent3 = SerializeEvent(event3); + + // Poll 1: 1 event in custom status + string customStatus1 = SerializeCustomStatus([serializedEvent1]); + // Poll 2: same event + 1 new event (accumulating list) + string customStatus2 = SerializeCustomStatus([serializedEvent1, serializedEvent2]); + // Poll 3: all 3 events accumulated + string customStatus3 = SerializeCustomStatus([serializedEvent1, serializedEvent2, serializedEvent3]); + // Poll 4: completed, all events also in output + string serializedOutput = SerializeWorkflowResult("done", [serializedEvent1, serializedEvent2, serializedEvent3]); + + int callCount = 0; + Mock mockClient = new("test"); + mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync(() => + { + callCount++; + return callCount switch + { + 1 => CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: customStatus1), + 2 => CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: customStatus2), + 3 => CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: customStatus3), + _ => CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput), + }; + }); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert — exactly 4 events: 3 incremental halt events + 1 completion + Assert.Equal(4, events.Count); + DurableHaltRequestedEvent halt1 = Assert.IsType(events[0]); + DurableHaltRequestedEvent halt2 = Assert.IsType(events[1]); + DurableHaltRequestedEvent halt3 = Assert.IsType(events[2]); + Assert.Equal("executor-1", halt1.ExecutorId); + Assert.Equal("executor-2", halt2.ExecutorId); + Assert.Equal("executor-3", halt3.ExecutorId); + DurableWorkflowCompletedEvent completed = Assert.IsType(events[3]); + Assert.Equal("done", completed.Data); + } + + [Fact] + public async Task WatchStreamAsync_NoNewEventsOnRepoll_DoesNotDuplicateAsync() + { + // Arrange — simulate polling where custom status doesn't change between polls, + // validating that events are not duplicated when the list is unchanged. + DurableHaltRequestedEvent event1 = new("executor-1"); + string serializedEvent1 = SerializeEvent(event1); + string customStatus = SerializeCustomStatus([serializedEvent1]); + string serializedOutput = SerializeWorkflowResult("result", [serializedEvent1]); + + int callCount = 0; + Mock mockClient = new("test"); + mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync(() => + { + callCount++; + return callCount switch + { + // First 3 polls return the same custom status (no new events after first) + <= 3 => CreateMetadata(OrchestrationRuntimeStatus.Running, serializedCustomStatus: customStatus), + _ => CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput), + }; + }); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + events.Add(evt); + } + + // Assert — event1 appears exactly once despite 3 polls with the same status + Assert.Equal(2, events.Count); + DurableHaltRequestedEvent haltResult = Assert.IsType(events[0]); + Assert.Equal("executor-1", haltResult.ExecutorId); + DurableWorkflowCompletedEvent completedResult = Assert.IsType(events[1]); + Assert.Equal("result", completedResult.Result); + } + + [Fact] + public async Task WatchStreamAsync_Cancellation_EndsGracefullyAsync() + { + // Arrange + using CancellationTokenSource cts = new(); + int pollCount = 0; + Mock mockClient = new("test"); + mockClient.Setup(c => c.GetInstanceAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync(() => + { + if (++pollCount >= 2) + { + cts.Cancel(); + } + + return CreateMetadata(OrchestrationRuntimeStatus.Running); + }); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act + List events = []; + await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token)) + { + events.Add(evt); + } + + // Assert — no exception thrown, stream ends cleanly + Assert.Empty(events); + } + + #endregion + + #region WaitForCompletionAsync + + [Fact] + public async Task WaitForCompletionAsync_Completed_ReturnsResultAsync() + { + // Arrange + string serializedOutput = SerializeWorkflowResult("hello world", []); + Mock mockClient = new("test"); + mockClient.Setup(c => c.WaitForInstanceCompletionAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync(CreateMetadata(OrchestrationRuntimeStatus.Completed, serializedOutput: serializedOutput)); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act + string? result = await run.WaitForCompletionAsync(); + + // Assert + Assert.Equal("hello world", result); + } + + [Fact] + public async Task WaitForCompletionAsync_Failed_ThrowsTaskFailedExceptionAsync() + { + // Arrange + Mock mockClient = new("test"); + mockClient.Setup(c => c.WaitForInstanceCompletionAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync(CreateMetadata( + OrchestrationRuntimeStatus.Failed, + failureDetails: new TaskFailureDetails("Error", "kaboom", null, null, null))); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act & Assert + TaskFailedException ex = await Assert.ThrowsAsync( + () => run.WaitForCompletionAsync().AsTask()); + Assert.Equal("kaboom", ex.FailureDetails.ErrorMessage); + } + + [Fact] + public async Task WaitForCompletionAsync_UnexpectedStatus_ThrowsAsync() + { + // Arrange + Mock mockClient = new("test"); + mockClient.Setup(c => c.WaitForInstanceCompletionAsync(InstanceId, true, It.IsAny())) + .ReturnsAsync(CreateMetadata(OrchestrationRuntimeStatus.Terminated)); + + DurableStreamingWorkflowRun run = new(mockClient.Object, InstanceId, CreateTestWorkflow()); + + // Act & Assert + await Assert.ThrowsAsync( + () => run.WaitForCompletionAsync().AsTask()); + } + + #endregion + + #region ExtractResult + + [Fact] + public void ExtractResult_NullOutput_ReturnsDefault() + { + // Act + string? result = DurableStreamingWorkflowRun.ExtractResult(null); + + // Assert + Assert.Null(result); + } + + [Fact] + public void ExtractResult_WrappedStringResult_ReturnsUnwrappedString() + { + // Arrange + string serializedOutput = SerializeWorkflowResult("hello", []); + + // Act + string? result = DurableStreamingWorkflowRun.ExtractResult(serializedOutput); + + // Assert + Assert.Equal("hello", result); + } + + [Fact] + public void ExtractResult_UnwrappedOutput_ThrowsInvalidOperationException() + { + // Arrange — raw output not wrapped in DurableWorkflowResult + string serializedOutput = JsonSerializer.Serialize("raw value"); + + // Act & Assert + Assert.Throws( + () => DurableStreamingWorkflowRun.ExtractResult(serializedOutput)); + } + + [Fact] + public void ExtractResult_WrappedObjectResult_DeserializesCorrectly() + { + // Arrange + TestPayload original = new() { Name = "test", Value = 42 }; + string resultJson = JsonSerializer.Serialize(original); + string serializedOutput = SerializeWorkflowResult(resultJson, []); + + // Act + TestPayload? result = DurableStreamingWorkflowRun.ExtractResult(serializedOutput); + + // Assert + Assert.NotNull(result); + Assert.Equal("test", result.Name); + Assert.Equal(42, result.Value); + } + + [Fact] + public void ExtractResult_CamelCaseSerializedObject_DeserializesToPascalCaseMembers() + { + // Arrange — executor outputs are serialized with DurableSerialization.Options (camelCase) + TestPayload original = new() { Name = "camel", Value = 99 }; + string resultJson = JsonSerializer.Serialize(original, DurableSerialization.Options); + string serializedOutput = SerializeWorkflowResult(resultJson, []); + + // Act + TestPayload? result = DurableStreamingWorkflowRun.ExtractResult(serializedOutput); + + // Assert + Assert.NotNull(result); + Assert.Equal("camel", result.Name); + Assert.Equal(99, result.Value); + } + + #endregion + + private sealed class TestPayload + { + public string? Name { get; set; } + + public int Value { get; set; } + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableWorkflowContextTests.cs b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableWorkflowContextTests.cs new file mode 100644 index 0000000000..437f236ee9 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.DurableTask.UnitTests/Workflows/DurableWorkflowContextTests.cs @@ -0,0 +1,504 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Microsoft.Agents.AI.DurableTask.Workflows; +using Microsoft.Agents.AI.Workflows; + +namespace Microsoft.Agents.AI.DurableTask.UnitTests.Workflows; + +public sealed class DurableWorkflowContextTests +{ + private static FunctionExecutor CreateTestExecutor(string id = "test-executor") + => new(id, (_, _, _) => default); + + #region ReadStateAsync + + [Fact] + public async Task ReadStateAsync_KeyExistsInInitialState_ReturnsValue() + { + // Arrange + Dictionary state = new() { ["__default__:counter"] = "42" }; + DurableWorkflowContext context = new(state, CreateTestExecutor()); + + // Act + int? result = await context.ReadStateAsync("counter"); + + // Assert + Assert.Equal(42, result); + } + + [Fact] + public async Task ReadStateAsync_KeyDoesNotExist_ReturnsNull() + { + // Arrange + DurableWorkflowContext context = new(null, CreateTestExecutor()); + + // Act + string? result = await context.ReadStateAsync("missing"); + + // Assert + Assert.Null(result); + } + + [Fact] + public async Task ReadStateAsync_LocalUpdateTakesPriorityOverInitialState() + { + // Arrange + Dictionary state = new() { ["__default__:key"] = "\"old\"" }; + DurableWorkflowContext context = new(state, CreateTestExecutor()); + await context.QueueStateUpdateAsync("key", "new"); + + // Act + string? result = await context.ReadStateAsync("key"); + + // Assert + Assert.Equal("new", result); + } + + [Fact] + public async Task ReadStateAsync_ScopeCleared_IgnoresInitialState() + { + // Arrange + Dictionary state = new() { ["__default__:key"] = "\"value\"" }; + DurableWorkflowContext context = new(state, CreateTestExecutor()); + await context.QueueClearScopeAsync(); + + // Act + string? result = await context.ReadStateAsync("key"); + + // Assert + Assert.Null(result); + } + + [Fact] + public async Task ReadStateAsync_WithNamedScope_ReadsFromCorrectScope() + { + // Arrange + Dictionary state = new() + { + ["scopeA:key"] = "\"fromA\"", + ["scopeB:key"] = "\"fromB\"" + }; + DurableWorkflowContext context = new(state, CreateTestExecutor()); + + // Act + string? resultA = await context.ReadStateAsync("key", "scopeA"); + string? resultB = await context.ReadStateAsync("key", "scopeB"); + + // Assert + Assert.Equal("fromA", resultA); + Assert.Equal("fromB", resultB); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + public async Task ReadStateAsync_NullOrEmptyKey_ThrowsArgumentException(string? key) + { + // Arrange + DurableWorkflowContext context = new(null, CreateTestExecutor()); + + // Act & Assert + await Assert.ThrowsAnyAsync(() => context.ReadStateAsync(key!).AsTask()); + } + + #endregion + + #region ReadOrInitStateAsync + + [Fact] + public async Task ReadOrInitStateAsync_KeyDoesNotExist_CallsFactoryAndQueuesUpdate() + { + // Arrange + DurableWorkflowContext context = new(null, CreateTestExecutor()); + + // Act + string result = await context.ReadOrInitStateAsync("key", () => "initialized"); + + // Assert + Assert.Equal("initialized", result); + Assert.True(context.StateUpdates.ContainsKey("__default__:key")); + } + + [Fact] + public async Task ReadOrInitStateAsync_KeyExists_ReturnsExistingValue() + { + // Arrange + Dictionary state = new() { ["__default__:key"] = "\"existing\"" }; + DurableWorkflowContext context = new(state, CreateTestExecutor()); + bool factoryCalled = false; + + // Act + string result = await context.ReadOrInitStateAsync("key", () => + { + factoryCalled = true; + return "should-not-be-used"; + }); + + // Assert + Assert.Equal("existing", result); + Assert.False(factoryCalled); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + public async Task ReadOrInitStateAsync_NullOrEmptyKey_ThrowsArgumentException(string? key) + { + // Arrange + DurableWorkflowContext context = new(null, CreateTestExecutor()); + + // Act & Assert + await Assert.ThrowsAnyAsync( + () => context.ReadOrInitStateAsync(key!, () => "value").AsTask()); + } + + [Fact] + public async Task ReadOrInitStateAsync_ValueType_MissingKey_CallsFactory() + { + // Arrange + // Validates that ReadStateAsync returns null (not 0) for missing keys, + // because the return type is int? (Nullable). This ensures the factory + // is correctly invoked for value types when the key does not exist. + DurableWorkflowContext context = new(null, CreateTestExecutor()); + + // Act + int result = await context.ReadOrInitStateAsync("counter", () => 42); + + // Assert + Assert.Equal(42, result); + Assert.True(context.StateUpdates.ContainsKey("__default__:counter")); + } + + [Fact] + public async Task ReadOrInitStateAsync_NullFactory_ThrowsArgumentNullException() + { + // Arrange + DurableWorkflowContext context = new(null, CreateTestExecutor()); + + // Act & Assert + await Assert.ThrowsAsync( + () => context.ReadOrInitStateAsync("key", null!).AsTask()); + } + + #endregion + + #region QueueStateUpdateAsync + + [Fact] + public async Task QueueStateUpdateAsync_SetsValue_VisibleToSubsequentRead() + { + // Arrange + DurableWorkflowContext context = new(null, CreateTestExecutor()); + + // Act + await context.QueueStateUpdateAsync("key", "hello"); + string? result = await context.ReadStateAsync("key"); + + // Assert + Assert.Equal("hello", result); + } + + [Fact] + public async Task QueueStateUpdateAsync_NullValue_RecordsDeletion() + { + // Arrange + Dictionary state = new() { ["__default__:key"] = "\"value\"" }; + DurableWorkflowContext context = new(state, CreateTestExecutor()); + + // Act + await context.QueueStateUpdateAsync("key", null); + + // Assert + Assert.True(context.StateUpdates.ContainsKey("__default__:key")); + Assert.Null(context.StateUpdates["__default__:key"]); + } + + [Theory] + [InlineData(null)] + [InlineData("")] + public async Task QueueStateUpdateAsync_NullOrEmptyKey_ThrowsArgumentException(string? key) + { + // Arrange + DurableWorkflowContext context = new(null, CreateTestExecutor()); + + // Act & Assert + await Assert.ThrowsAnyAsync( + () => context.QueueStateUpdateAsync(key!, "value").AsTask()); + } + + #endregion + + #region QueueClearScopeAsync + + [Fact] + public async Task QueueClearScopeAsync_DefaultScope_ClearsStateAndPendingUpdates() + { + // Arrange + Dictionary state = new() { ["__default__:key"] = "\"value\"" }; + DurableWorkflowContext context = new(state, CreateTestExecutor()); + await context.QueueStateUpdateAsync("pending", "data"); + + // Act + await context.QueueClearScopeAsync(); + + // Assert + Assert.Contains("__default__", context.ClearedScopes); + Assert.Empty(context.StateUpdates); + } + + [Fact] + public async Task QueueClearScopeAsync_NamedScope_OnlyClearsThatScope() + { + // Arrange + DurableWorkflowContext context = new(null, CreateTestExecutor()); + await context.QueueStateUpdateAsync("keyA", "valueA", scopeName: "scopeA"); + await context.QueueStateUpdateAsync("keyB", "valueB", scopeName: "scopeB"); + + // Act + await context.QueueClearScopeAsync("scopeA"); + + // Assert + Assert.DoesNotContain("scopeA:keyA", context.StateUpdates.Keys); + Assert.Contains("scopeB:keyB", context.StateUpdates.Keys); + } + + #endregion + + #region ReadStateKeysAsync + + [Fact] + public async Task ReadStateKeysAsync_ReturnsKeysFromInitialState() + { + // Arrange + Dictionary state = new() + { + ["__default__:alpha"] = "\"a\"", + ["__default__:beta"] = "\"b\"" + }; + DurableWorkflowContext context = new(state, CreateTestExecutor()); + + // Act + HashSet keys = await context.ReadStateKeysAsync(); + + // Assert + Assert.Equal(2, keys.Count); + Assert.Contains("alpha", keys); + Assert.Contains("beta", keys); + } + + [Fact] + public async Task ReadStateKeysAsync_MergesLocalUpdatesAndDeletions() + { + // Arrange + Dictionary state = new() + { + ["__default__:existing"] = "\"val\"", + ["__default__:toDelete"] = "\"val\"" + }; + DurableWorkflowContext context = new(state, CreateTestExecutor()); + await context.QueueStateUpdateAsync("newKey", "value"); + await context.QueueStateUpdateAsync("toDelete", null); + + // Act + HashSet keys = await context.ReadStateKeysAsync(); + + // Assert + Assert.Contains("existing", keys); + Assert.Contains("newKey", keys); + Assert.DoesNotContain("toDelete", keys); + } + + [Fact] + public async Task ReadStateKeysAsync_AfterClearScope_ExcludesInitialState() + { + // Arrange + Dictionary state = new() { ["__default__:old"] = "\"val\"" }; + DurableWorkflowContext context = new(state, CreateTestExecutor()); + await context.QueueClearScopeAsync(); + await context.QueueStateUpdateAsync("new", "value"); + + // Act + HashSet keys = await context.ReadStateKeysAsync(); + + // Assert + Assert.DoesNotContain("old", keys); + Assert.Contains("new", keys); + } + + [Fact] + public async Task ReadStateKeysAsync_WithNamedScope_OnlyReturnsKeysFromThatScope() + { + // Arrange + Dictionary state = new() + { + ["scopeA:key1"] = "\"val\"", + ["scopeB:key2"] = "\"val\"" + }; + DurableWorkflowContext context = new(state, CreateTestExecutor()); + + // Act + HashSet keysA = await context.ReadStateKeysAsync("scopeA"); + + // Assert + Assert.Single(keysA); + Assert.Contains("key1", keysA); + } + + #endregion + + #region AddEventAsync + + [Fact] + public async Task AddEventAsync_AddsEventToCollection() + { + // Arrange + DurableWorkflowContext context = new(null, CreateTestExecutor()); + WorkflowEvent evt = new ExecutorInvokedEvent("test", "test-data"); + + // Act + await context.AddEventAsync(evt); + + // Assert + Assert.Single(context.OutboundEvents); + Assert.Same(evt, context.OutboundEvents[0]); + } + + [Fact] + public async Task AddEventAsync_NullEvent_DoesNotAdd() + { + // Arrange + DurableWorkflowContext context = new(null, CreateTestExecutor()); + + // Act +#pragma warning disable CS8625 // Cannot convert null literal to non-nullable reference type. + await context.AddEventAsync(null); +#pragma warning restore CS8625 + + // Assert + Assert.Empty(context.OutboundEvents); + } + + #endregion + + #region SendMessageAsync + + [Fact] + public async Task SendMessageAsync_SerializesMessageWithTypeName() + { + // Arrange + DurableWorkflowContext context = new(null, CreateTestExecutor()); + + // Act + await context.SendMessageAsync("hello"); + + // Assert + Assert.Single(context.SentMessages); + Assert.Equal(typeof(string).AssemblyQualifiedName, context.SentMessages[0].TypeName); + Assert.NotNull(context.SentMessages[0].Data); + } + + [Fact] + public async Task SendMessageAsync_NullMessage_DoesNotAdd() + { + // Arrange + DurableWorkflowContext context = new(null, CreateTestExecutor()); + + // Act +#pragma warning disable CS8625 // Cannot convert null literal to non-nullable reference type. + await context.SendMessageAsync(null); +#pragma warning restore CS8625 + + // Assert + Assert.Empty(context.SentMessages); + } + + #endregion + + #region YieldOutputAsync + + [Fact] + public async Task YieldOutputAsync_AddsWorkflowOutputEvent() + { + // Arrange + DurableWorkflowContext context = new(null, CreateTestExecutor()); + + // Act + await context.YieldOutputAsync("result"); + + // Assert + Assert.Single(context.OutboundEvents); + WorkflowOutputEvent outputEvent = Assert.IsType(context.OutboundEvents[0]); + Assert.Equal("result", outputEvent.Data); + } + + [Fact] + public async Task YieldOutputAsync_NullOutput_DoesNotAdd() + { + // Arrange + DurableWorkflowContext context = new(null, CreateTestExecutor()); + + // Act +#pragma warning disable CS8625 // Cannot convert null literal to non-nullable reference type. + await context.YieldOutputAsync(null); +#pragma warning restore CS8625 + + // Assert + Assert.Empty(context.OutboundEvents); + } + + #endregion + + #region RequestHaltAsync + + [Fact] + public async Task RequestHaltAsync_SetsHaltRequestedAndAddsEvent() + { + // Arrange + DurableWorkflowContext context = new(null, CreateTestExecutor()); + + // Act + await context.RequestHaltAsync(); + + // Assert + Assert.True(context.HaltRequested); + Assert.Single(context.OutboundEvents); + Assert.IsType(context.OutboundEvents[0]); + } + + #endregion + + #region Properties + + [Fact] + public void TraceContext_ReturnsNull() + { + // Arrange + DurableWorkflowContext context = new(null, CreateTestExecutor()); + + // Assert + Assert.Null(context.TraceContext); + } + + [Fact] + public void ConcurrentRunsEnabled_ReturnsFalse() + { + // Arrange + DurableWorkflowContext context = new(null, CreateTestExecutor()); + + // Assert + Assert.False(context.ConcurrentRunsEnabled); + } + + [Fact] + public async Task Constructor_NullInitialState_CreatesEmptyState() + { + // Arrange & Act + DurableWorkflowContext context = new(null, CreateTestExecutor()); + + // Assert + string? result = await context.ReadStateAsync("anything"); + Assert.Null(result); + } + + #endregion +}