Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dotnet/agent-framework-dotnet.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
<Project Path="samples/Durable/Workflow/ConsoleApps/02_ConcurrentWorkflow/02_ConcurrentWorkflow.csproj" />
<Project Path="samples/Durable/Workflow/ConsoleApps/03_ConditionalEdges/03_ConditionalEdges.csproj" />
<Project Path="samples/Durable/Workflow/ConsoleApps/04_WorkflowAndAgents/04_WorkflowAndAgents.csproj" />
<Project Path="samples/Durable/Workflow/ConsoleApps/05_WorkflowEvents/05_WorkflowEvents.csproj" />
<Project Path="samples/Durable/Workflow/ConsoleApps/06_WorkflowSharedState/06_WorkflowSharedState.csproj" />
</Folder>
<Folder Name="/Samples/Durable/Workflows/AzureFunctions/">
<Project Path="samples/Durable/Workflow/AzureFunctions/01_SequentialWorkflow/01_SequentialWorkflow.csproj" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>net10.0</TargetFrameworks>
<OutputType>Exe</OutputType>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<AssemblyName>WorkflowEvents</AssemblyName>
<RootNamespace>WorkflowEvents</RootNamespace>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Identity" />
<PackageReference Include="Microsoft.DurableTask.Client.AzureManaged" />
<PackageReference Include="Microsoft.DurableTask.Worker.AzureManaged" />
<PackageReference Include="Microsoft.Extensions.Hosting" />
</ItemGroup>

<!-- Local projects that should be switched to package references when using the sample outside of this MAF repo -->
<!--
<ItemGroup>
<PackageReference Include="Microsoft.Agents.AI.DurableTask" />
<PackageReference Include="Microsoft.Agents.AI.Workflows" />
</ItemGroup>
-->
<ItemGroup>
<ProjectReference Include="..\..\..\..\..\src\Microsoft.Agents.AI.DurableTask\Microsoft.Agents.AI.DurableTask.csproj" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) Microsoft. All rights reserved.

using Microsoft.Agents.AI.Workflows;

namespace WorkflowEvents;

// ═══════════════════════════════════════════════════════════════════════════════
// Custom event types - callers observe these via WatchStreamAsync
// ═══════════════════════════════════════════════════════════════════════════════

internal sealed class OrderLookupStartedEvent(string orderId) : WorkflowEvent(orderId)
{
public string OrderId { get; } = orderId;
}

internal sealed class OrderFoundEvent(string customerName) : WorkflowEvent(customerName)
{
public string CustomerName { get; } = customerName;
}

internal sealed class CancellationProgressEvent(int percentComplete, string status) : WorkflowEvent(status)
{
public int PercentComplete { get; } = percentComplete;
public string Status { get; } = status;
}

internal sealed class OrderCancelledEvent() : WorkflowEvent("Order cancelled");

internal sealed class EmailSentEvent(string email) : WorkflowEvent(email)
{
public string Email { get; } = email;
}

// ═══════════════════════════════════════════════════════════════════════════════
// Domain models
// ═══════════════════════════════════════════════════════════════════════════════

internal sealed record Order(string Id, DateTime OrderDate, bool IsCancelled, string? CancelReason, Customer Customer);

internal sealed record Customer(string Name, string Email);

// ═══════════════════════════════════════════════════════════════════════════════
// Executors - emit events via AddEventAsync and YieldOutputAsync
// ═══════════════════════════════════════════════════════════════════════════════

/// <summary>
/// Looks up an order by ID, emitting progress events.
/// </summary>
internal sealed class OrderLookup() : Executor<string, Order>("OrderLookup")
{
public override async ValueTask<Order> HandleAsync(
string message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
await context.AddEventAsync(new OrderLookupStartedEvent(message), cancellationToken);

// Simulate database lookup
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);

Order order = new(
Id: message,
OrderDate: DateTime.UtcNow.AddDays(-1),
IsCancelled: false,
CancelReason: "Customer requested cancellation",
Customer: new Customer(Name: "Jerry", Email: "jerry@example.com"));

await context.AddEventAsync(new OrderFoundEvent(order.Customer.Name), cancellationToken);

// YieldOutputAsync emits a WorkflowOutputEvent observable via streaming
await context.YieldOutputAsync(order, cancellationToken);

return order;
}
}

/// <summary>
/// Cancels an order, emitting progress events during the multi-step process.
/// </summary>
internal sealed class OrderCancel() : Executor<Order, Order>("OrderCancel")
{
public override async ValueTask<Order> HandleAsync(
Order message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
await context.AddEventAsync(new CancellationProgressEvent(0, "Starting cancellation"), cancellationToken);

// Simulate a multi-step cancellation process
await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken);
await context.AddEventAsync(new CancellationProgressEvent(33, "Contacting payment provider"), cancellationToken);

await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken);
await context.AddEventAsync(new CancellationProgressEvent(66, "Processing refund"), cancellationToken);

await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken);

Order cancelledOrder = message with { IsCancelled = true };
await context.AddEventAsync(new CancellationProgressEvent(100, "Complete"), cancellationToken);
await context.AddEventAsync(new OrderCancelledEvent(), cancellationToken);

await context.YieldOutputAsync(cancelledOrder, cancellationToken);

return cancelledOrder;
}
}

/// <summary>
/// Sends a cancellation confirmation email, emitting an event on completion.
/// </summary>
internal sealed class SendEmail() : Executor<Order, string>("SendEmail")
{
public override async ValueTask<string> HandleAsync(
Order message,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// Simulate sending email
await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken);

string result = $"Cancellation email sent for order {message.Id} to {message.Customer.Email}.";

await context.AddEventAsync(new EmailSentEvent(message.Customer.Email), cancellationToken);

await context.YieldOutputAsync(result, cancellationToken);

return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (c) Microsoft. All rights reserved.

// ═══════════════════════════════════════════════════════════════════════════════
// SAMPLE: Workflow Events and Streaming
// ═══════════════════════════════════════════════════════════════════════════════
//
// This sample demonstrates how to use IWorkflowContext event methods in executors
// and stream events from the caller side:
//
// 1. AddEventAsync - Emit custom events that callers can observe in real-time
// 2. StreamAsync - Start a workflow and obtain a streaming handle
// 3. WatchStreamAsync - Observe events as they occur (custom, framework, and terminal)
//
// The sample uses IWorkflowClient.StreamAsync to start a workflow and
// WatchStreamAsync to observe events as they occur in real-time.
//
// Workflow: OrderLookup -> OrderCancel -> SendEmail
// ═══════════════════════════════════════════════════════════════════════════════

using Microsoft.Agents.AI.DurableTask;
using Microsoft.Agents.AI.DurableTask.Workflows;
using Microsoft.Agents.AI.Workflows;
using Microsoft.DurableTask.Client.AzureManaged;
using Microsoft.DurableTask.Worker.AzureManaged;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using WorkflowEvents;

// Get DTS connection string from environment variable
string dtsConnectionString = Environment.GetEnvironmentVariable("DURABLE_TASK_SCHEDULER_CONNECTION_STRING")
?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None";

// Define executors and build workflow
OrderLookup orderLookup = new();
OrderCancel orderCancel = new();
SendEmail sendEmail = new();

Workflow cancelOrder = new WorkflowBuilder(orderLookup)
.WithName("CancelOrder")
.WithDescription("Cancel an order and notify the customer")
.AddEdge(orderLookup, orderCancel)
.AddEdge(orderCancel, sendEmail)
.Build();

// Configure host with durable workflow support
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureLogging(logging => logging.SetMinimumLevel(LogLevel.Warning))
.ConfigureServices(services =>
{
services.ConfigureDurableWorkflows(
workflowOptions => workflowOptions.AddWorkflow(cancelOrder),
workerBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString),
clientBuilder: builder => builder.UseDurableTaskScheduler(dtsConnectionString));
})
.Build();

await host.StartAsync();

IWorkflowClient workflowClient = host.Services.GetRequiredService<IWorkflowClient>();

Console.WriteLine("Workflow Events Demo - Enter order ID (or 'exit'):");

while (true)
{
Console.Write("> ");
string? input = Console.ReadLine();
if (string.IsNullOrWhiteSpace(input) || input.Equals("exit", StringComparison.OrdinalIgnoreCase))
{
break;
}

try
{
await RunWorkflowWithStreamingAsync(input, cancelOrder, workflowClient);
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}

Console.WriteLine();
}

await host.StopAsync();

// Runs a workflow and streams events as they occur
static async Task RunWorkflowWithStreamingAsync(string orderId, Workflow workflow, IWorkflowClient client)
{
// StreamAsync starts the workflow and returns a streaming handle for observing events
IStreamingWorkflowRun run = await client.StreamAsync(workflow, orderId);
Console.WriteLine($"Started run: {run.RunId}");

// WatchStreamAsync yields events as they're emitted by executors
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
Console.WriteLine($" New event received at {DateTime.Now:HH:mm:ss.ffff} ({evt.GetType().Name})");

switch (evt)
{
// Custom domain events (emitted via AddEventAsync)
case OrderLookupStartedEvent e:
WriteColored($" [Lookup] Looking up order {e.OrderId}", ConsoleColor.Cyan);
break;
case OrderFoundEvent e:
WriteColored($" [Lookup] Found: {e.CustomerName}", ConsoleColor.Cyan);
break;
case CancellationProgressEvent e:
WriteColored($" [Cancel] {e.PercentComplete}% - {e.Status}", ConsoleColor.Yellow);
break;
case OrderCancelledEvent:
WriteColored(" [Cancel] Done", ConsoleColor.Yellow);
break;
case EmailSentEvent e:
WriteColored($" [Email] Sent to {e.Email}", ConsoleColor.Magenta);
break;

case WorkflowOutputEvent e:
WriteColored($" [Output] {e.SourceId}", ConsoleColor.DarkGray);
break;

// Workflow completion
case DurableWorkflowCompletedEvent e:
WriteColored($" Completed: {e.Result}", ConsoleColor.Green);
break;
case DurableWorkflowFailedEvent e:
WriteColored($" Failed: {e.ErrorMessage}", ConsoleColor.Red);
break;
}
}
}

static void WriteColored(string message, ConsoleColor color)
{
Console.ForegroundColor = color;
Console.WriteLine(message);
Console.ResetColor();
}
Loading