Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0a95746
feat: add pipeline behavior interfaces and contracts
Feb 17, 2026
0db8c19
feat: wire pipeline behaviors into workflow execution
Feb 17, 2026
74c9689
test: add unit tests and docs for pipeline behaviors
Feb 17, 2026
9bfcb22
.NET Workflows: Remove unused ExecutorStage.PostExecution enum value
Feb 17, 2026
560f9d4
.NET Workflows: Fix null configure silently ignored in WithBehaviors
Feb 17, 2026
2408c8f
.NET Workflows: Avoid async state machine allocation in BeginStreamAsync
Feb 17, 2026
91fc271
.NET Workflows: Fix RunId correlation in ExecutorBehaviorContext
Feb 17, 2026
dde1d04
.NET Workflows: Wire ExecuteWorkflowEndBehaviorsAsync into workflow t…
Feb 17, 2026
6b8589c
Bug 6: Use required keyword on ExecutorBehaviorContext properties
Feb 17, 2026
866f6ab
Bug 7: Add non-generic ExecuteWorkflowPipelineAsync overload
Feb 17, 2026
288e19b
Bug 8: Add tests for Ending stage and RunId consistency
Feb 17, 2026
a911137
Bug 9: Use Throw.IfNull in BehaviorExecutionException
Feb 17, 2026
651f4a8
Rename WorkflowBehaviorIntegrationTests to WorkflowBehaviorEndToEndTests
Feb 17, 2026
8474c89
Add comment explaining CancellationToken.None for end behaviors
Feb 17, 2026
31a3031
Bug 11: Use Throw.IfNull in WorkflowBehaviorOptions
Feb 17, 2026
9e88049
Bug 12: Guard against double-registration in SetRunEndingCallback
Feb 17, 2026
0497bc7
Bug 13: Add combined workflow + executor behavior end-to-end test
Feb 17, 2026
dac3985
Bug 14: Document continuation call semantics in IExecutorBehavior
Feb 17, 2026
046a794
Bug 15: Use Throw.IfNullOrEmpty for behaviorType and stage
Feb 17, 2026
9018cbf
Bug 16: Test that finalHandler exception is not wrapped without behav…
Feb 17, 2026
e26411e
Bug 17: Validate BehaviorType and Stage in exception test
Feb 17, 2026
42c48e9
Bug 18: Clarify TResult contract and continuation semantics in IWorkf…
Feb 17, 2026
8a61c4c
Bug 19: Distinguish ExecutorId from ExecutorType in ExecutorBehaviorC…
Feb 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using Microsoft.Shared.Diagnostics;

namespace Microsoft.Agents.AI.Workflows.Behaviors;

/// <summary>
/// Exception thrown when a behavior fails during execution.
/// </summary>
public sealed class BehaviorExecutionException : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="BehaviorExecutionException"/> class.
/// </summary>
public BehaviorExecutionException()
: base("Error executing behavior")
{
this.BehaviorType = string.Empty;
this.Stage = string.Empty;
}

/// <summary>
/// Initializes a new instance of the <see cref="BehaviorExecutionException"/> class with a specified error message.
/// </summary>
/// <param name="message">The error message.</param>
public BehaviorExecutionException(string message)
: base(message)
{
this.BehaviorType = string.Empty;
this.Stage = string.Empty;
}

/// <summary>
/// Initializes a new instance of the <see cref="BehaviorExecutionException"/> class with a specified error message and inner exception.
/// </summary>
/// <param name="message">The error message.</param>
/// <param name="innerException">The exception that caused this exception.</param>
public BehaviorExecutionException(string message, Exception innerException)
: base(message, innerException)
{
this.BehaviorType = string.Empty;
this.Stage = string.Empty;
}

/// <summary>
/// Initializes a new instance of the <see cref="BehaviorExecutionException"/> class.
/// </summary>
/// <param name="behaviorType">The type name of the behavior that failed.</param>
/// <param name="stage">The stage at which the behavior failed.</param>
/// <param name="innerException">The exception that caused the behavior to fail.</param>
public BehaviorExecutionException(string behaviorType, string stage, Exception innerException)
: base($"Error executing behavior '{behaviorType}' at stage '{stage}'", innerException)
{
Throw.IfNull(innerException);
this.BehaviorType = Throw.IfNullOrEmpty(behaviorType);
this.Stage = Throw.IfNullOrEmpty(stage);
}

/// <summary>
/// Gets the type name of the behavior that failed.
/// </summary>
public string BehaviorType { get; }

/// <summary>
/// Gets the stage at which the behavior failed.
/// </summary>
public string Stage { get; }
}
165 changes: 165 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.Workflows/Behaviors/BehaviorPipeline.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Agents.AI.Workflows.Behaviors;

/// <summary>
/// Internal class that manages the execution of behavior pipelines for workflows and executors.
/// </summary>
internal sealed class BehaviorPipeline
{
private readonly List<IExecutorBehavior> _executorBehaviors;
private readonly List<IWorkflowBehavior> _workflowBehaviors;

/// <summary>
/// Initializes a new instance of the <see cref="BehaviorPipeline"/> class.
/// </summary>
/// <param name="executorBehaviors">The collection of executor behaviors to execute.</param>
/// <param name="workflowBehaviors">The collection of workflow behaviors to execute.</param>
public BehaviorPipeline(
IEnumerable<IExecutorBehavior> executorBehaviors,
IEnumerable<IWorkflowBehavior> workflowBehaviors)
{
this._executorBehaviors = executorBehaviors.ToList();
this._workflowBehaviors = workflowBehaviors.ToList();
}

/// <summary>
/// Gets a value indicating whether any executor behaviors are registered.
/// </summary>
public bool HasExecutorBehaviors => this._executorBehaviors.Count > 0;

/// <summary>
/// Gets a value indicating whether any workflow behaviors are registered.
/// </summary>
public bool HasWorkflowBehaviors => this._workflowBehaviors.Count > 0;

/// <summary>
/// Executes the executor behavior pipeline.
/// </summary>
/// <param name="context">The context for the executor execution.</param>
/// <param name="finalHandler">The final handler to execute after all behaviors.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The result of the executor execution.</returns>
public async ValueTask<object?> ExecuteExecutorPipelineAsync(
ExecutorBehaviorContext context,
Func<CancellationToken, ValueTask<object?>> finalHandler,
CancellationToken cancellationToken)
{
if (this._executorBehaviors.Count == 0)
{
return await finalHandler(cancellationToken).ConfigureAwait(false);
}

// Build chain from end to start (reverse order)
ExecutorBehaviorContinuation pipeline = new(finalHandler);

for (int i = this._executorBehaviors.Count - 1; i >= 0; i--)
{
var behavior = this._executorBehaviors[i];
var continuation = pipeline;
pipeline = new ExecutorBehaviorContinuation((ct) => ExecuteBehaviorWithErrorHandlingAsync(behavior, context, continuation, ct));
}

return await pipeline(cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Executes the workflow behavior pipeline with no return value.
/// </summary>
/// <param name="context">The context for the workflow execution.</param>
/// <param name="finalHandler">The final handler to execute after all behaviors.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public async ValueTask ExecuteWorkflowPipelineAsync(
WorkflowBehaviorContext context,
Func<CancellationToken, ValueTask> finalHandler,
CancellationToken cancellationToken)
{
await this.ExecuteWorkflowPipelineAsync<int>(
context,
async ct => { await finalHandler(ct).ConfigureAwait(false); return 0; },
cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Executes the workflow behavior pipeline.
/// </summary>
/// <typeparam name="TResult">The result type of the workflow operation.</typeparam>
/// <param name="context">The context for the workflow execution.</param>
/// <param name="finalHandler">The final handler to execute after all behaviors.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The result of the workflow execution.</returns>
public async ValueTask<TResult> ExecuteWorkflowPipelineAsync<TResult>(
WorkflowBehaviorContext context,
Func<CancellationToken, ValueTask<TResult>> finalHandler,
CancellationToken cancellationToken)
{
if (this._workflowBehaviors.Count == 0)
{
return await finalHandler(cancellationToken).ConfigureAwait(false);
}

// Build chain from end to start (reverse order)
WorkflowBehaviorContinuation<TResult> pipeline = new(finalHandler);

for (int i = this._workflowBehaviors.Count - 1; i >= 0; i--)
{
var behavior = this._workflowBehaviors[i];
var continuation = pipeline;
pipeline = new WorkflowBehaviorContinuation<TResult>((ct) => ExecuteBehaviorWithErrorHandlingAsync(behavior, context, continuation, ct));
}

return await pipeline(cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Executes an executor behavior with error handling.
/// </summary>
private static async ValueTask<object?> ExecuteBehaviorWithErrorHandlingAsync(
IExecutorBehavior behavior,
ExecutorBehaviorContext context,
ExecutorBehaviorContinuation continuation,
CancellationToken cancellationToken)
{
try
{
return await behavior.HandleAsync(context, continuation, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex) when (ex is not BehaviorExecutionException)
{
throw new BehaviorExecutionException(
behavior.GetType().FullName ?? "Unknown",
context.Stage.ToString(),
ex
);
}
}

/// <summary>
/// Executes a workflow behavior with error handling.
/// </summary>
private static async ValueTask<TResult> ExecuteBehaviorWithErrorHandlingAsync<TResult>(
IWorkflowBehavior behavior,
WorkflowBehaviorContext context,
WorkflowBehaviorContinuation<TResult> continuation,
CancellationToken cancellationToken)
{
try
{
return await behavior.HandleAsync(context, continuation, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex) when (ex is not BehaviorExecutionException)
{
throw new BehaviorExecutionException(
behavior.GetType().FullName ?? "Unknown",
context.Stage.ToString(),
ex
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Agents.AI.Workflows.Behaviors;

/// <summary>
/// Represents a behavior that wraps executor step execution, allowing custom logic before and after executor operations.
/// </summary>
/// <remarks>
/// Implement this interface to add cross-cutting concerns like logging, telemetry, validation, or performance monitoring
/// at the executor level. Multiple behaviors can be chained together to form a pipeline.
/// Behaviors execute once per executor invocation. Logic placed before <c>await continuation()</c> runs before the executor;
/// logic placed after runs once the executor (and any subsequent behaviors) has completed.
/// </remarks>
public interface IExecutorBehavior
{
/// <summary>
/// Handles executor execution with the ability to execute logic before and after the next behavior in the pipeline.
/// </summary>
/// <param name="context">The context containing information about the current executor execution.</param>
/// <param name="continuation">The delegate to invoke the next behavior in the pipeline or the actual executor operation.
/// Should be called exactly once. Calling it multiple times will re-execute downstream behaviors and the executor.
/// Logic placed before the call runs before the executor; logic placed after runs once the executor completes.</param>
/// <param name="cancellationToken">The cancellation token to monitor for cancellation requests.</param>
/// <returns>A task representing the asynchronous operation, with the result of the executor operation.</returns>
ValueTask<object?> HandleAsync(
ExecutorBehaviorContext context,
ExecutorBehaviorContinuation continuation,
CancellationToken cancellationToken = default);
}

/// <summary>
/// Represents the continuation in the executor behavior pipeline.
/// </summary>
/// <param name="cancellationToken">The cancellation token to monitor for cancellation requests.</param>
/// <returns>A task representing the asynchronous operation with the executor result.</returns>
public delegate ValueTask<object?> ExecutorBehaviorContinuation(CancellationToken cancellationToken);

/// <summary>
/// Provides context information for executor behaviors.
/// </summary>
public sealed class ExecutorBehaviorContext
{
/// <summary>
/// Gets the string identifier assigned to the executor being invoked.
/// This is the logical name used to register and route messages to the executor,
/// and is distinct from <see cref="ExecutorType"/>, which represents the executor's CLR type.
/// </summary>
public string ExecutorId { get; init; } = string.Empty;

/// <summary>
/// Gets the type of the executor being invoked.
/// </summary>
public required Type ExecutorType { get; init; }

/// <summary>
/// Gets the message being processed by the executor.
/// </summary>
public required object Message { get; init; }

/// <summary>
/// Gets the type of the message being processed.
/// </summary>
public required Type MessageType { get; init; }

/// <summary>
/// Gets the unique identifier for the workflow execution run.
/// </summary>
public string RunId { get; init; } = string.Empty;

/// <summary>
/// Gets the stage of executor execution.
/// </summary>
public ExecutorStage Stage { get; init; }

/// <summary>
/// Gets the workflow context for this execution.
/// </summary>
public required IWorkflowContext WorkflowContext { get; init; }

/// <summary>
/// Gets the trace context for distributed tracing.
/// </summary>
public IReadOnlyDictionary<string, string>? TraceContext { get; init; }

/// <summary>
/// Gets optional custom properties that can be used to pass additional context.
/// </summary>
public IReadOnlyDictionary<string, object>? Properties { get; init; }
}

/// <summary>
/// Represents the stage of executor execution.
/// </summary>
public enum ExecutorStage
{
/// <summary>
/// Before the executor begins processing the message. Behaviors are invoked once per executor call.
/// To perform logic after the executor completes, place code after the <c>await continuation()</c> call
/// in <see cref="IExecutorBehavior.HandleAsync"/>.
/// </summary>
PreExecution
}
Loading