From 4513bcc9b00f0a8e031479189c0d6d010df68ad9 Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Thu, 2 Apr 2026 15:05:55 -0700 Subject: [PATCH 1/5] Improve workflow unit tests --- .../AggregatingExecutorTests.cs | 77 ++++++++++ .../InputWaiterAndOutputFilterTests.cs | 143 ++++++++++++++++++ .../RoundRobinGroupChatManagerTests.cs | 138 +++++++++++++++++ 3 files changed, 358 insertions(+) create mode 100644 dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AggregatingExecutorTests.cs create mode 100644 dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InputWaiterAndOutputFilterTests.cs create mode 100644 dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RoundRobinGroupChatManagerTests.cs diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AggregatingExecutorTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AggregatingExecutorTests.cs new file mode 100644 index 0000000000..6e82425994 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AggregatingExecutorTests.cs @@ -0,0 +1,77 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Threading.Tasks; +using FluentAssertions; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +public class AggregatingExecutorTests +{ + [Fact] + public async Task Test_HandleAsync_AggregatesIncrementallyAsync() + { + AggregatingExecutor executor = new("sum", (aggregate, input) => + aggregate == null ? input : $"{aggregate}+{input}"); + + TestWorkflowContext context = new(executor.Id); + + string? result1 = await executor.HandleAsync("a", context, default); + string? result2 = await executor.HandleAsync("b", context, default); + string? result3 = await executor.HandleAsync("c", context, default); + + result1.Should().Be("a"); + result2.Should().Be("a+b"); + result3.Should().Be("a+b+c"); + } + + [Fact] + public async Task Test_HandleAsync_FirstCallReceivesNullAggregateAsync() + { + string? receivedAggregate = "sentinel"; + + AggregatingExecutor executor = new("first-call", (aggregate, input) => + { + receivedAggregate = aggregate; + return input; + }); + + TestWorkflowContext context = new(executor.Id); + await executor.HandleAsync("hello", context, default); + + receivedAggregate.Should().BeNull("the first invocation should receive a null aggregate for reference types"); + } + + [Fact] + public async Task Test_HandleAsync_AggregatorReturningNullClearsStateAsync() + { + AggregatingExecutor executor = new("nullable", (aggregate, input) => + input == "clear" ? null : (aggregate ?? "") + input); + + TestWorkflowContext context = new(executor.Id); + + string? result1 = await executor.HandleAsync("a", context, default); + result1.Should().Be("a"); + + string? result2 = await executor.HandleAsync("clear", context, default); + result2.Should().BeNull("the aggregator returned null to clear the state"); + + // After clearing, the next call should receive null aggregate again + string? result3 = await executor.HandleAsync("b", context, default); + result3.Should().Be("b", "the aggregate should restart from null after being cleared"); + } + + [Fact] + public async Task Test_HandleAsync_PersistsStateBetweenCallsAsync() + { + AggregatingExecutor executor = new("counter", (aggregate, _) => + aggregate == null ? "1" : $"{int.Parse(aggregate) + 1}"); + + TestWorkflowContext context = new(executor.Id); + + for (int i = 1; i <= 5; i++) + { + string? result = await executor.HandleAsync("tick", context, default); + result.Should().Be($"{i}", "the aggregate should increment with each call"); + } + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InputWaiterAndOutputFilterTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InputWaiterAndOutputFilterTests.cs new file mode 100644 index 0000000000..990d4671d3 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InputWaiterAndOutputFilterTests.cs @@ -0,0 +1,143 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using Microsoft.Agents.AI.Workflows.Execution; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +public sealed class InputWaiterTests : IDisposable +{ + private readonly InputWaiter _waiter = new(); + + public void Dispose() + { + this._waiter.Dispose(); + GC.SuppressFinalize(this); + } + + [Fact] + public async Task Test_WaitForInputAsync_CompletesAfterSignalAsync() + { + this._waiter.SignalInput(); + + // WaitForInputAsync should complete immediately since input was already signaled + Task waitTask = this._waiter.WaitForInputAsync(CancellationToken.None); + Task completed = await Task.WhenAny(waitTask, Task.Delay(TimeSpan.FromSeconds(1))); + + completed.Should().BeSameAs(waitTask, "the wait task should complete before the timeout"); + } + + [Fact] + public async Task Test_WaitForInputAsync_BlocksUntilSignaledAsync() + { + Task waitTask = this._waiter.WaitForInputAsync(TimeSpan.FromSeconds(5)); + + await Task.Delay(50); + waitTask.IsCompleted.Should().BeFalse("the waiter should block until input is signaled"); + + this._waiter.SignalInput(); + + Task completed = await Task.WhenAny(waitTask, Task.Delay(TimeSpan.FromSeconds(1))); + completed.Should().BeSameAs(waitTask, "the wait task should complete after being signaled"); + } + + [Fact] + public void Test_SignalInput_DoubleSignalDoesNotThrow() + { + // Binary semaphore behavior: double signal should be idempotent + FluentActions.Invoking(() => + { + this._waiter.SignalInput(); + this._waiter.SignalInput(); + }).Should().NotThrow("double signaling should be handled gracefully"); + } + + [Fact] + public async Task Test_WaitForInputAsync_RespectsCancellationAsync() + { + using CancellationTokenSource cts = new(); + Task waitTask = this._waiter.WaitForInputAsync(cts.Token); + + cts.Cancel(); + + Func act = () => waitTask; + await act.Should().ThrowAsync(); + } + + [Fact] + public async Task Test_WaitForInputAsync_DoesNotCompleteWhenNotSignaledAsync() + { + using CancellationTokenSource cts = new(); + Task waitTask = this._waiter.WaitForInputAsync(cts.Token); + Task completed = await Task.WhenAny(waitTask, Task.Delay(100)); + + completed.Should().NotBeSameAs(waitTask, "the wait task should not complete when input is not signaled"); + + // Cancel and observe the pending task to avoid an unobserved exception on Dispose + cts.Cancel(); + try { await waitTask; } + catch (OperationCanceledException) { } + } + + [Fact] + public async Task Test_WaitForInputAsync_CanBeSignaledMultipleTimesSequentiallyAsync() + { + // First signal/wait cycle + this._waiter.SignalInput(); + await this._waiter.WaitForInputAsync(TimeSpan.FromSeconds(1)); + + // Second signal/wait cycle + this._waiter.SignalInput(); + await this._waiter.WaitForInputAsync(TimeSpan.FromSeconds(1)); + } +} + +public class OutputFilterTests +{ + private static OutputFilter CreateFilterWithOutputFrom(string outputExecutorId) + { + NoOpExecutor start = new("start"); + NoOpExecutor end = new("end"); + + Workflow workflow = new WorkflowBuilder("start") + .AddEdge(start, end) + .WithOutputFrom(outputExecutorId == "end" ? end : start) + .Build(); + + return new OutputFilter(workflow); + } + + [Fact] + public void Test_CanOutput_ReturnsTrueForRegisteredExecutor() + { + OutputFilter filter = CreateFilterWithOutputFrom("end"); + + filter.CanOutput("end", "some output").Should().BeTrue("the executor was registered via WithOutputFrom"); + } + + [Fact] + public void Test_CanOutput_ReturnsFalseForUnregisteredExecutor() + { + OutputFilter filter = CreateFilterWithOutputFrom("end"); + + filter.CanOutput("start", "some output").Should().BeFalse("start was not registered as an output executor"); + } + + [Fact] + public void Test_CanOutput_ReturnsFalseForNonExistentExecutor() + { + OutputFilter filter = CreateFilterWithOutputFrom("end"); + + filter.CanOutput("nonexistent", "some output").Should().BeFalse("an executor not in the workflow should not be an output executor"); + } + + private sealed class NoOpExecutor(string id) : Executor(id) + { + protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder) + => protocolBuilder.ConfigureRoutes(routeBuilder => + routeBuilder.AddHandler((msg, ctx) => ctx.SendMessageAsync(msg))); + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RoundRobinGroupChatManagerTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RoundRobinGroupChatManagerTests.cs new file mode 100644 index 0000000000..ed21a052a8 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RoundRobinGroupChatManagerTests.cs @@ -0,0 +1,138 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using FluentAssertions; +using Microsoft.Extensions.AI; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +public class RoundRobinGroupChatManagerTests +{ + [Fact] + public async Task Test_SelectNextAgent_CyclesInOrderAsync() + { + TestEchoAgent agent1 = new(id: "agent1"); + TestEchoAgent agent2 = new(id: "agent2"); + TestEchoAgent agent3 = new(id: "agent3"); + List agents = [agent1, agent2, agent3]; + List history = []; + + RoundRobinGroupChatManager manager = new(agents); + + AIAgent first = await manager.SelectNextAgentAsync(history); + AIAgent second = await manager.SelectNextAgentAsync(history); + AIAgent third = await manager.SelectNextAgentAsync(history); + + first.Should().BeSameAs(agent1); + second.Should().BeSameAs(agent2); + third.Should().BeSameAs(agent3); + } + + [Fact] + public async Task Test_SelectNextAgent_WrapsAroundAsync() + { + TestEchoAgent agent1 = new(id: "agent1"); + TestEchoAgent agent2 = new(id: "agent2"); + List agents = [agent1, agent2]; + List history = []; + + RoundRobinGroupChatManager manager = new(agents); + + await manager.SelectNextAgentAsync(history); + await manager.SelectNextAgentAsync(history); + + AIAgent wrappedAgent = await manager.SelectNextAgentAsync(history); + + wrappedAgent.Should().BeSameAs(agent1, "the manager should wrap around to the first agent after cycling through all agents"); + } + + [Fact] + public async Task Test_ShouldTerminate_DefaultBehaviorTerminatesAtMaxIterationsAsync() + { + TestEchoAgent agent1 = new(id: "agent1"); + List agents = [agent1]; + List history = []; + + RoundRobinGroupChatManager manager = new(agents) { MaximumIterationCount = 3 }; + + manager.IterationCount = 2; + bool shouldTerminateBefore = await manager.ShouldTerminateAsync(history); + shouldTerminateBefore.Should().BeFalse("the iteration count has not yet reached the maximum"); + + manager.IterationCount = 3; + bool shouldTerminateAt = await manager.ShouldTerminateAsync(history); + shouldTerminateAt.Should().BeTrue("the iteration count has reached the maximum"); + } + + [Fact] + public async Task Test_ShouldTerminate_CustomFuncTerminatesEarlyAsync() + { + TestEchoAgent agent1 = new(id: "agent1"); + List agents = [agent1]; + List history = [new ChatMessage(ChatRole.Assistant, "done")]; + + RoundRobinGroupChatManager manager = new(agents, + shouldTerminateFunc: (_, messages, _) => new(messages.Any(m => m.Text == "done"))) + { + MaximumIterationCount = 100 + }; + + bool shouldTerminate = await manager.ShouldTerminateAsync(history); + shouldTerminate.Should().BeTrue("the custom termination function should cause early termination"); + } + + [Fact] + public async Task Test_ShouldTerminate_CustomFuncDoesNotTerminateWhenNotMetAsync() + { + TestEchoAgent agent1 = new(id: "agent1"); + List agents = [agent1]; + List history = [new ChatMessage(ChatRole.Assistant, "continue")]; + + RoundRobinGroupChatManager manager = new(agents, + shouldTerminateFunc: (_, messages, _) => new(messages.Any(m => m.Text == "done"))) + { + MaximumIterationCount = 100 + }; + + bool shouldTerminate = await manager.ShouldTerminateAsync(history); + shouldTerminate.Should().BeFalse("the custom termination function should not cause termination when condition is not met"); + } + + [Fact] + public async Task Test_Reset_ResetsIterationCountAndAgentIndexAsync() + { + TestEchoAgent agent1 = new(id: "agent1"); + TestEchoAgent agent2 = new(id: "agent2"); + List agents = [agent1, agent2]; + List history = []; + + RoundRobinGroupChatManager manager = new(agents); + manager.IterationCount = 5; + + // Advance the internal index past the first agent + await manager.SelectNextAgentAsync(history); + + manager.Reset(); + + manager.IterationCount.Should().Be(0, "Reset should clear the iteration count"); + + AIAgent afterReset = await manager.SelectNextAgentAsync(history); + afterReset.Should().BeSameAs(agent1, "Reset should cause the next selection to start from the first agent"); + } + + [Fact] + public void Test_Constructor_ThrowsOnNullAgents() + { + FluentActions.Invoking(() => new RoundRobinGroupChatManager(null!)) + .Should().Throw(); + } + + [Fact] + public void Test_Constructor_ThrowsOnEmptyAgents() + { + FluentActions.Invoking(() => new RoundRobinGroupChatManager([])) + .Should().Throw(); + } +} From 90952ed176b69cc011335c37b254f4ee2585790b Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Thu, 2 Apr 2026 15:32:12 -0700 Subject: [PATCH 2/5] Update test name prefix for clarity. --- .../AggregatingExecutorTests.cs | 8 ++++---- .../InputWaiterAndOutputFilterTests.cs | 18 +++++++++--------- .../RoundRobinGroupChatManagerTests.cs | 16 ++++++++-------- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AggregatingExecutorTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AggregatingExecutorTests.cs index 6e82425994..fac76744e9 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AggregatingExecutorTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AggregatingExecutorTests.cs @@ -8,7 +8,7 @@ namespace Microsoft.Agents.AI.Workflows.UnitTests; public class AggregatingExecutorTests { [Fact] - public async Task Test_HandleAsync_AggregatesIncrementallyAsync() + public async Task AggregatingExecutor_HandleAsync_AggregatesIncrementallyAsync() { AggregatingExecutor executor = new("sum", (aggregate, input) => aggregate == null ? input : $"{aggregate}+{input}"); @@ -25,7 +25,7 @@ public async Task Test_HandleAsync_AggregatesIncrementallyAsync() } [Fact] - public async Task Test_HandleAsync_FirstCallReceivesNullAggregateAsync() + public async Task AggregatingExecutor_HandleAsync_FirstCallReceivesNullAggregateAsync() { string? receivedAggregate = "sentinel"; @@ -42,7 +42,7 @@ public async Task Test_HandleAsync_FirstCallReceivesNullAggregateAsync() } [Fact] - public async Task Test_HandleAsync_AggregatorReturningNullClearsStateAsync() + public async Task AggregatingExecutor_HandleAsync_AggregatorReturningNullClearsStateAsync() { AggregatingExecutor executor = new("nullable", (aggregate, input) => input == "clear" ? null : (aggregate ?? "") + input); @@ -61,7 +61,7 @@ public async Task Test_HandleAsync_AggregatorReturningNullClearsStateAsync() } [Fact] - public async Task Test_HandleAsync_PersistsStateBetweenCallsAsync() + public async Task AggregatingExecutor_HandleAsync_PersistsStateBetweenCallsAsync() { AggregatingExecutor executor = new("counter", (aggregate, _) => aggregate == null ? "1" : $"{int.Parse(aggregate) + 1}"); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InputWaiterAndOutputFilterTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InputWaiterAndOutputFilterTests.cs index 990d4671d3..55f02db443 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InputWaiterAndOutputFilterTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InputWaiterAndOutputFilterTests.cs @@ -19,7 +19,7 @@ public void Dispose() } [Fact] - public async Task Test_WaitForInputAsync_CompletesAfterSignalAsync() + public async Task InputWaiter_WaitForInputAsync_CompletesAfterSignalAsync() { this._waiter.SignalInput(); @@ -31,7 +31,7 @@ public async Task Test_WaitForInputAsync_CompletesAfterSignalAsync() } [Fact] - public async Task Test_WaitForInputAsync_BlocksUntilSignaledAsync() + public async Task InputWaiter_WaitForInputAsync_BlocksUntilSignaledAsync() { Task waitTask = this._waiter.WaitForInputAsync(TimeSpan.FromSeconds(5)); @@ -45,7 +45,7 @@ public async Task Test_WaitForInputAsync_BlocksUntilSignaledAsync() } [Fact] - public void Test_SignalInput_DoubleSignalDoesNotThrow() + public void InputWaiter_SignalInput_DoubleSignalDoesNotThrow() { // Binary semaphore behavior: double signal should be idempotent FluentActions.Invoking(() => @@ -56,7 +56,7 @@ public void Test_SignalInput_DoubleSignalDoesNotThrow() } [Fact] - public async Task Test_WaitForInputAsync_RespectsCancellationAsync() + public async Task InputWaiter_WaitForInputAsync_RespectsCancellationAsync() { using CancellationTokenSource cts = new(); Task waitTask = this._waiter.WaitForInputAsync(cts.Token); @@ -68,7 +68,7 @@ public async Task Test_WaitForInputAsync_RespectsCancellationAsync() } [Fact] - public async Task Test_WaitForInputAsync_DoesNotCompleteWhenNotSignaledAsync() + public async Task InputWaiter_WaitForInputAsync_DoesNotCompleteWhenNotSignaledAsync() { using CancellationTokenSource cts = new(); Task waitTask = this._waiter.WaitForInputAsync(cts.Token); @@ -83,7 +83,7 @@ public async Task Test_WaitForInputAsync_DoesNotCompleteWhenNotSignaledAsync() } [Fact] - public async Task Test_WaitForInputAsync_CanBeSignaledMultipleTimesSequentiallyAsync() + public async Task InputWaiter_WaitForInputAsync_CanBeSignaledMultipleTimesSequentiallyAsync() { // First signal/wait cycle this._waiter.SignalInput(); @@ -111,7 +111,7 @@ private static OutputFilter CreateFilterWithOutputFrom(string outputExecutorId) } [Fact] - public void Test_CanOutput_ReturnsTrueForRegisteredExecutor() + public void OutputFilter_CanOutput_ReturnsTrueForRegisteredExecutor() { OutputFilter filter = CreateFilterWithOutputFrom("end"); @@ -119,7 +119,7 @@ public void Test_CanOutput_ReturnsTrueForRegisteredExecutor() } [Fact] - public void Test_CanOutput_ReturnsFalseForUnregisteredExecutor() + public void OutputFilter_CanOutput_ReturnsFalseForUnregisteredExecutor() { OutputFilter filter = CreateFilterWithOutputFrom("end"); @@ -127,7 +127,7 @@ public void Test_CanOutput_ReturnsFalseForUnregisteredExecutor() } [Fact] - public void Test_CanOutput_ReturnsFalseForNonExistentExecutor() + public void OutputFilter_CanOutput_ReturnsFalseForNonExistentExecutor() { OutputFilter filter = CreateFilterWithOutputFrom("end"); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RoundRobinGroupChatManagerTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RoundRobinGroupChatManagerTests.cs index ed21a052a8..d1c545f202 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RoundRobinGroupChatManagerTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RoundRobinGroupChatManagerTests.cs @@ -11,7 +11,7 @@ namespace Microsoft.Agents.AI.Workflows.UnitTests; public class RoundRobinGroupChatManagerTests { [Fact] - public async Task Test_SelectNextAgent_CyclesInOrderAsync() + public async Task RoundRobinGroupChat_SelectNextAgent_CyclesInOrderAsync() { TestEchoAgent agent1 = new(id: "agent1"); TestEchoAgent agent2 = new(id: "agent2"); @@ -31,7 +31,7 @@ public async Task Test_SelectNextAgent_CyclesInOrderAsync() } [Fact] - public async Task Test_SelectNextAgent_WrapsAroundAsync() + public async Task RoundRobinGroupChat_SelectNextAgent_WrapsAroundAsync() { TestEchoAgent agent1 = new(id: "agent1"); TestEchoAgent agent2 = new(id: "agent2"); @@ -49,7 +49,7 @@ public async Task Test_SelectNextAgent_WrapsAroundAsync() } [Fact] - public async Task Test_ShouldTerminate_DefaultBehaviorTerminatesAtMaxIterationsAsync() + public async Task RoundRobinGroupChat_ShouldTerminate_DefaultBehaviorTerminatesAtMaxIterationsAsync() { TestEchoAgent agent1 = new(id: "agent1"); List agents = [agent1]; @@ -67,7 +67,7 @@ public async Task Test_ShouldTerminate_DefaultBehaviorTerminatesAtMaxIterationsA } [Fact] - public async Task Test_ShouldTerminate_CustomFuncTerminatesEarlyAsync() + public async Task RoundRobinGroupChat_ShouldTerminate_CustomFuncTerminatesEarlyAsync() { TestEchoAgent agent1 = new(id: "agent1"); List agents = [agent1]; @@ -84,7 +84,7 @@ public async Task Test_ShouldTerminate_CustomFuncTerminatesEarlyAsync() } [Fact] - public async Task Test_ShouldTerminate_CustomFuncDoesNotTerminateWhenNotMetAsync() + public async Task RoundRobinGroupChat_ShouldTerminate_CustomFuncDoesNotTerminateWhenNotMetAsync() { TestEchoAgent agent1 = new(id: "agent1"); List agents = [agent1]; @@ -101,7 +101,7 @@ public async Task Test_ShouldTerminate_CustomFuncDoesNotTerminateWhenNotMetAsync } [Fact] - public async Task Test_Reset_ResetsIterationCountAndAgentIndexAsync() + public async Task RoundRobinGroupChat_Reset_ResetsIterationCountAndAgentIndexAsync() { TestEchoAgent agent1 = new(id: "agent1"); TestEchoAgent agent2 = new(id: "agent2"); @@ -123,14 +123,14 @@ public async Task Test_Reset_ResetsIterationCountAndAgentIndexAsync() } [Fact] - public void Test_Constructor_ThrowsOnNullAgents() + public void RoundRobinGroupChat_Constructor_ThrowsOnNullAgents() { FluentActions.Invoking(() => new RoundRobinGroupChatManager(null!)) .Should().Throw(); } [Fact] - public void Test_Constructor_ThrowsOnEmptyAgents() + public void RoundRobinGroupChat_Constructor_ThrowsOnEmptyAgents() { FluentActions.Invoking(() => new RoundRobinGroupChatManager([])) .Should().Throw(); From 1f8078e4968ec5f3c625529dc325aea632dce62e Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Thu, 2 Apr 2026 16:04:41 -0700 Subject: [PATCH 3/5] Update tests to surface any errors. --- .../InputWaiterAndOutputFilterTests.cs | 2 ++ .../RoundRobinGroupChatManagerTests.cs | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InputWaiterAndOutputFilterTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InputWaiterAndOutputFilterTests.cs index 55f02db443..77c0160200 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InputWaiterAndOutputFilterTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/InputWaiterAndOutputFilterTests.cs @@ -28,6 +28,7 @@ public async Task InputWaiter_WaitForInputAsync_CompletesAfterSignalAsync() Task completed = await Task.WhenAny(waitTask, Task.Delay(TimeSpan.FromSeconds(1))); completed.Should().BeSameAs(waitTask, "the wait task should complete before the timeout"); + await waitTask; } [Fact] @@ -42,6 +43,7 @@ public async Task InputWaiter_WaitForInputAsync_BlocksUntilSignaledAsync() Task completed = await Task.WhenAny(waitTask, Task.Delay(TimeSpan.FromSeconds(1))); completed.Should().BeSameAs(waitTask, "the wait task should complete after being signaled"); + await waitTask; } [Fact] diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RoundRobinGroupChatManagerTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RoundRobinGroupChatManagerTests.cs index d1c545f202..3c87507ec6 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RoundRobinGroupChatManagerTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/RoundRobinGroupChatManagerTests.cs @@ -126,7 +126,8 @@ public async Task RoundRobinGroupChat_Reset_ResetsIterationCountAndAgentIndexAsy public void RoundRobinGroupChat_Constructor_ThrowsOnNullAgents() { FluentActions.Invoking(() => new RoundRobinGroupChatManager(null!)) - .Should().Throw(); + .Should().Throw() + .WithParameterName("agents"); } [Fact] From 01a19396008b56ec811383e9738ffab877a58b80 Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Fri, 3 Apr 2026 11:12:11 -0700 Subject: [PATCH 4/5] fix check-point restore-time race in off-thread workflow event stream --- .../Execution/StreamingRunEventStream.cs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index 4eb1290961..ffcc61dad4 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -287,10 +287,6 @@ public void ClearBufferedEvents() { // Discard each event (including InternalCompletionSignals) } - - // After clearing, signal the run loop to continue if needed - // The run loop will send a new completion signal when it finishes processing from the restored state - this.SignalInput(); } public async ValueTask StopAsync() From 5aed20efd6a3035c2daf3f98eaa0c5023414c7e4 Mon Sep 17 00:00:00 2001 From: Peter Ibekwe Date: Mon, 6 Apr 2026 16:38:44 -0700 Subject: [PATCH 5/5] Fixes an intermittent checkpoint-restore race in in-process workflow runs. --- .../InProc/InProcessRunnerContext.cs | 6 +++ .../CheckpointResumeTests.cs | 42 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs index f0bb8cac26..d6c7d301e3 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs @@ -419,6 +419,12 @@ internal async ValueTask ImportStateAsync(Checkpoint checkpoint) .Select(id => this.EnsureExecutorAsync(id, tracer: null).AsTask()) .ToArray(); + // Discard queued external deliveries from the superseded timeline so a runtime + // restore cannot apply stale responses after importing the checkpoint state. + while (this._queuedExternalDeliveries.TryDequeue(out _)) + { + } + this._nextStep = new StepContext(); this._nextStep.ImportMessages(importedState.QueuedMessages); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointResumeTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointResumeTests.cs index 9d4b514af7..53ea644712 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointResumeTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointResumeTests.cs @@ -279,6 +279,48 @@ internal async Task Checkpoint_Restore_WithPendingRequests_RepublishesRequestInf "the workflow should be able to continue after the runtime restore replay"); } + /// + /// Verifies that restoring a live run clears any queued external responses from the + /// superseded timeline before importing checkpoint state. + /// + [Fact] + internal async Task Checkpoint_Restore_ClearsQueuedExternalResponsesBeforeImportAsync() + { + Workflow workflow = CreateSimpleRequestWorkflow(); + CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); + InProcessExecutionEnvironment env = ExecutionEnvironment.InProcess_Lockstep.ToWorkflowExecutionEnvironment(); + + await using StreamingRun run = await env.WithCheckpointing(checkpointManager) + .RunStreamingAsync(workflow, "Hello"); + + (ExternalRequest pendingRequest, CheckpointInfo checkpoint) = await CapturePendingRequestAndCheckpointAsync(run); + + await run.SendResponseAsync(pendingRequest.CreateResponse("World")); + await run.RestoreCheckpointAsync(checkpoint); + + List restoredEvents = await ReadToHaltAsync(run); + ExternalRequest replayedRequest = restoredEvents.OfType() + .Select(evt => evt.Request) + .Should() + .ContainSingle("the restored run should still be waiting for the checkpointed request") + .Subject; + + restoredEvents.OfType().Should().BeEmpty( + "a queued response from the superseded timeline should not be processed after restore"); + RunStatus statusAfterRestore = await run.GetStatusAsync(); + statusAfterRestore.Should().Be(RunStatus.PendingRequests, + "the restored run should remain pending until a post-restore response is sent"); + + await run.SendResponseAsync(replayedRequest.CreateResponse("Again")); + + List completionEvents = await ReadToHaltAsync(run); + completionEvents.OfType().Should().BeEmpty( + "the restored request should complete cleanly once a new response is provided"); + RunStatus finalStatus = await run.GetStatusAsync(); + finalStatus.Should().Be(RunStatus.Idle, + "the workflow should finish once the replayed request receives a fresh response"); + } + /// /// Verifies that a resumed parent workflow re-emits pending requests that originated in a subworkflow. ///