-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
Feature Request: Pipeline Behavior Extension Points for Workflow Framework
Problem Statement
The Microsoft.Agents.AI.Workflows framework currently lacks extensibility hooks for cross-cutting concerns at the workflow and executor execution levels. While an event system exists (ExecutorInvokedEvent, SuperStepStartedEvent, etc.), events are fire-and-forget and cannot:
- Intercept or modify execution flow
- Short-circuit execution based on validation
- Measure execution time and inject telemetry
- Transform inputs/outputs
- Implement retry or caching logic
- Add authentication/authorization checks
This limitation forces developers to either:
- Fork and modify framework internals
- Implement workarounds with reflection
- Duplicate cross-cutting logic across every executor
Proposed Solution
Add pipeline behavior extension points that allow developers to inject custom logic before and after:
- Workflow start/end - When workflow execution begins and completes
- Executor step start/end - When individual executors process messages
This enables a middleware-style pattern (similar to ASP.NET Core or MediatR's IPipelineBehavior) where multiple behaviors can be chained together.
API Design
1. Workflow-Level Behaviors
public interface IWorkflowBehavior
{
ValueTask<TResult> HandleAsync<TResult>(
WorkflowBehaviorContext context,
WorkflowBehaviorDelegate<TResult> next,
CancellationToken cancellationToken);
}
public class WorkflowBehaviorContext
{
public string WorkflowName { get; init; }
public string? WorkflowDescription { get; init; }
public string RunId { get; init; }
public string StartExecutorId { get; init; }
public WorkflowStage Stage { get; init; } // Starting or Ending
public IDictionary<string, object>? Properties { get; init; }
}2. Executor-Level Behaviors
public interface IExecutorBehavior
{
ValueTask<object?> HandleAsync(
ExecutorBehaviorContext context,
ExecutorBehaviorDelegate next,
CancellationToken cancellationToken);
}
public class ExecutorBehaviorContext
{
public string ExecutorId { get; init; }
public Type ExecutorType { get; init; }
public object Message { get; init; }
public Type MessageType { get; init; }
public string RunId { get; init; }
public ExecutorStage Stage { get; init; } // PreExecution or PostExecution
public IWorkflowContext WorkflowContext { get; init; }
public TraceContext TraceContext { get; init; }
public IDictionary<string, object>? Properties { get; init; }
}3. Registration API
var workflow = new WorkflowBuilder(startExecutor)
.WithBehaviors(options =>
{
options.AddExecutorBehavior(new LoggingBehavior(logger));
options.AddExecutorBehavior(new ValidationBehavior());
options.AddWorkflowBehavior(new TelemetryBehavior());
})
.AddEdge(executor1, executor2)
.Build();Use Cases
1. Performance Monitoring
public class PerformanceMonitoringBehavior : IExecutorBehavior
{
public async ValueTask<object?> HandleAsync(
ExecutorBehaviorContext context,
ExecutorBehaviorDelegate next,
CancellationToken cancellationToken)
{
var stopwatch = Stopwatch.StartNew();
try
{
return await next(cancellationToken);
}
finally
{
_metrics.RecordExecutorDuration(
context.ExecutorId,
stopwatch.ElapsedMilliseconds);
}
}
}2. Input Validation
public class ValidationBehavior : IExecutorBehavior
{
public async ValueTask<object?> HandleAsync(
ExecutorBehaviorContext context,
ExecutorBehaviorDelegate next,
CancellationToken cancellationToken)
{
if (context.Stage == ExecutorStage.PreExecution)
{
ValidateMessage(context.Message);
}
return await next(cancellationToken);
}
}3. Distributed Tracing
public class TracingBehavior : IWorkflowBehavior
{
public async ValueTask<TResult> HandleAsync<TResult>(
WorkflowBehaviorContext context,
WorkflowBehaviorDelegate<TResult> next,
CancellationToken cancellationToken)
{
using var span = _tracer.StartSpan(
$"workflow.{context.Stage}",
new { workflowName = context.WorkflowName, runId = context.RunId });
return await next(cancellationToken);
}
}4. Caching
public class CachingBehavior : IExecutorBehavior
{
public async ValueTask<object?> HandleAsync(
ExecutorBehaviorContext context,
ExecutorBehaviorDelegate next,
CancellationToken cancellationToken)
{
var cacheKey = GetCacheKey(context);
if (_cache.TryGetValue(cacheKey, out var cached))
return cached;
var result = await next(cancellationToken);
_cache.Set(cacheKey, result);
return result;
}
}Benefits
1. Separation of Concerns
- Keep business logic in executors
- Extract cross-cutting concerns to behaviors
- Single Responsibility Principle
2. Composability
- Chain multiple behaviors together
- Mix and match behaviors per workflow
- Reuse behaviors across workflows
3. Testability
- Test behaviors independently
- Mock behavior pipelines in tests
- Verify behavior ordering
4. Zero Breaking Changes
- All changes are additive
BehaviorPipelineproperty is nullableWithBehaviors()is optional- Existing workflows work unchanged
- Fast path when no behaviors registered (zero overhead)
5. Familiar Pattern
- ASP.NET Core middleware:
app.Use() - MediatR:
IPipelineBehavior<TRequest, TResponse> - Developers already know this pattern
Implementation Notes
Performance
- Zero overhead when no behaviors are registered (fast path check)
- Minimal overhead when behaviors are registered but do nothing
- Pipeline built once at workflow creation, not per execution
Error Handling
public class BehaviorExecutionException : Exception
{
public string BehaviorType { get; }
public string Stage { get; }
// Wraps exceptions thrown by behaviors for better diagnostics
}Execution Order
- Behaviors execute in registration order (outer to inner)
- Last registered behavior executes closest to core execution
- Similar to ASP.NET Core middleware ordering
Comparison with Existing Patterns
| Feature | Events | Agent Middleware | Pipeline Behaviors |
|---|---|---|---|
| Intercept execution | ❌ | ✅ | ✅ |
| Modify flow | ❌ | ✅ | ✅ |
| Workflow lifecycle | ❌ | ❌ | ✅ |
| Executor lifecycle | ❌ | ❌ | ✅ |
| Scope | Fire-and-forget | Per agent run | Per workflow/executor |
| Chain behaviors | ❌ | ✅ | ✅ |
Note: This complements agent middleware (AgentRunOptions.RunMiddleware) by providing workflow/executor-specific extension points, rather than replacing it.
Related Issues
- .NET: [Bug]: AgentRunOptions.RunMiddleware as described in docs does not exist #3951 - Bug about missing
AgentRunOptions.RunMiddleware(agent-level middleware) - .NET: [Bug]: [AG-UI] [Workflows] Session is always null in the middleware #3823 - Bug about session in AG-UI middleware
- Python: [Feature Request] Support custom WorkflowEvent emission for AG-UI/CopilotKit integration #3872 - Feature request for custom WorkflowEvent emission
This proposal addresses a different extensibility layer (workflow/executor behaviors) than existing agent middleware.
Questions for Maintainers
- Naming: Is "Pipeline Behavior" the right term, or would "Workflow Middleware" be clearer?
- Scope: Should behaviors have access to full
IWorkflowContext, or a limited subset? - Async: Should we support synchronous behaviors (
FuncvsValueTask)? - Properties: Should
Propertiesdictionary be mutable for passing data between behaviors? - DI Integration: Should behaviors support constructor injection from
IServiceProvider?
Proposed Implementation Plan
- ✅ Create behavior interfaces (
IWorkflowBehavior,IExecutorBehavior) - ✅ Create pipeline execution engine (
BehaviorPipeline) - ✅ Add registration mechanism (
WorkflowBehaviorOptions,WithBehaviors()) - ✅ Integrate into
WorkflowandExecutorexecution paths - ✅ Add unit tests for behavior execution and ordering
- ⏳ Add integration tests with sample behaviors
- ✅ Add XML documentation and code samples
- ⏳ Update workflow documentation
Additional Context
This feature enables enterprise-grade workflow observability and control without framework modifications. It follows established .NET patterns and maintains backward compatibility.
Ready to implement: POC is available and follows CONTRIBUTING.md guidelines.
Metadata
Metadata
Assignees
Labels
Type
Projects
Status