From 904df2770bf0c658943ff6b2ea00e46a353e2b4a Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Mon, 30 Mar 2026 09:29:00 +0200 Subject: [PATCH 1/2] fix(subscriptions): prevent NRE when Ack races with Resubscribe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit During resubscribe, DisposeCommitHandler() sets CheckpointCommitHandler to null. If the AsyncHandlingFilter worker thread is still processing a message, it calls Ack() → CheckpointCommitHandler!.Commit() and hits a NullReferenceException. The NRE cascades through Nack → Dropped → resubscribe in an infinite loop. Fix: capture CheckpointCommitHandler into a local variable and return early if null, using the standard pattern for concurrent null races. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../EventSubscriptionWithCheckpoint.cs | 8 +- .../ResubscribeOnHandlerFailureTests.cs | 101 ++++++++++++++++++ 2 files changed, 108 insertions(+), 1 deletion(-) diff --git a/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs b/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs index 11469191..2e3bd981 100644 --- a/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs +++ b/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs @@ -90,12 +90,18 @@ ValueTask NackOnAsyncWorker(IMessageConsumeContext context, Exception exception) [MethodImpl(MethodImplOptions.AggressiveInlining)] ValueTask Ack(IMessageConsumeContext context) { + // Capture locally — CheckpointCommitHandler can be nulled by Resubscribe/DisposeCommitHandler + // on another thread while the async worker is still completing a message. + var handler = CheckpointCommitHandler; + + if (handler is null) return default; + var eventPosition = GetPositionFromContext(context); LastProcessed = eventPosition; context.LogContext.MessageAcked(context.MessageType, context.GlobalPosition); - return CheckpointCommitHandler!.Commit( + return handler.Commit( new(eventPosition.Position!.Value, context.Sequence, eventPosition.Created) { LogContext = context.LogContext }, context.CancellationToken ); diff --git a/src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs b/src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs index 405d634b..4d4327ae 100644 --- a/src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs +++ b/src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs @@ -156,6 +156,101 @@ public override ValueTask HandleEvent(IMessageConsumeContex } } + /// + /// Reproduces a race condition during resubscribe: the AsyncHandlingFilter worker thread + /// calls Acknowledge() → Ack() → CheckpointCommitHandler!.Commit() after Resubscribe() + /// has already set CheckpointCommitHandler to null via DisposeCommitHandler(). + /// The null-forgiving operator on line 98 causes a NullReferenceException. + /// + [Test] + [Retry(3)] + public async Task Should_not_throw_nre_when_ack_races_with_resubscribe(CancellationToken ct) { + // Arrange + var loggerFactory = LoggingExtensions.GetLoggerFactory(); + var nreTcs = new TaskCompletionSource(); + var ackStarted = new TaskCompletionSource(); + var proceedToAck = new TaskCompletionSource(); + + var options = new TestSubscriptionOptions { + SubscriptionId = "test-ack-race", + ThrowOnError = true, + CheckpointCommitBatchSize = 1, + CheckpointCommitDelayMs = 100 + }; + + // A handler that signals when it's about to ack, then waits for the test to + // trigger resubscribe before the ack path runs. + var handler = new SlowAckHandler(ackStarted, proceedToAck); + var pipe = new ConsumePipe().AddDefaultConsumer(handler); + + var checkpointStore = new NoOpCheckpointStore(); + + var subscription = new TestPollingSubscription( + options, + checkpointStore, + pipe, + loggerFactory, + eventCount: 20 + ); + + // Act + await subscription.Subscribe( + _ => { }, + (_, _, ex) => { + if (ex is NullReferenceException nre) nreTcs.TrySetResult(nre); + }, + ct + ); + + // Wait until the handler has processed an event and is about to ack + var started = await Task.WhenAny(ackStarted.Task, Task.Delay(TimeSpan.FromSeconds(10), ct)); + started.ShouldBe(ackStarted.Task, "Handler should have started processing an event"); + + // Now trigger Dropped → Resubscribe, which will null CheckpointCommitHandler + subscription.TriggerDropped(); + + // Give Resubscribe a moment to dispose the commit handler + await Task.Delay(200, ct); + + // Let the handler complete — the AsyncHandlingFilter worker will now call Acknowledge, + // which calls Ack → CheckpointCommitHandler!.Commit(). If the handler is already null, + // this is the NRE. + proceedToAck.TrySetResult(); + + // Assert — wait for either the NRE or a timeout + var result = await Task.WhenAny(nreTcs.Task, Task.Delay(TimeSpan.FromSeconds(5), ct)); + + if (result == nreTcs.Task) { + var exception = await nreTcs.Task; + Assert.Fail( + $"NullReferenceException in Ack path during resubscribe race: {exception}. " + + "CheckpointCommitHandler was null when Ack tried to call Commit()." + ); + } + + // Cleanup + await subscription.Unsubscribe(_ => { }, ct); + } + + /// + /// A handler that signals the test when processing is happening, + /// then blocks until the test allows it to complete. This creates the + /// window for the race between Ack and Resubscribe. + /// + class SlowAckHandler(TaskCompletionSource ackStarted, TaskCompletionSource proceedToAck) : BaseEventHandler { + int _signaled; + + public override async ValueTask HandleEvent(IMessageConsumeContext context) { + // Signal only on the first event to avoid double-signaling + if (Interlocked.CompareExchange(ref _signaled, 1, 0) == 0) { + ackStarted.TrySetResult(); + await proceedToAck.Task; + } + + return EventHandlingStatus.Success; + } + } + record TestSubscriptionOptions : SubscriptionWithCheckpointOptions; /// @@ -182,6 +277,12 @@ class TestPollingSubscription( ) { TaskRunner? _runner; + /// + /// Exposes the protected Dropped method so the test can trigger a resubscribe. + /// + public void TriggerDropped() + => Dropped(DropReason.SubscriptionError, new InvalidOperationException("Simulated drop for race test")); + protected override ValueTask Subscribe(CancellationToken cancellationToken) { _runner = new TaskRunner(token => PollEvents(token)).Start(); From aa1455a64dc5421d81b8ddb638db03a6cf42594b Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Mon, 30 Mar 2026 09:48:37 +0200 Subject: [PATCH 2/2] fix(test): address PR review feedback - Use RunContinuationsAsynchronously on all TaskCompletionSource instances to prevent inline continuations altering the cross-thread race behavior - Update test comments to describe current behavior instead of referencing the old buggy code and line numbers Co-Authored-By: Claude Opus 4.6 (1M context) --- .../ResubscribeOnHandlerFailureTests.cs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs b/src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs index 4d4327ae..1ca17fc4 100644 --- a/src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs +++ b/src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs @@ -157,19 +157,18 @@ public override ValueTask HandleEvent(IMessageConsumeContex } /// - /// Reproduces a race condition during resubscribe: the AsyncHandlingFilter worker thread - /// calls Acknowledge() → Ack() → CheckpointCommitHandler!.Commit() after Resubscribe() - /// has already set CheckpointCommitHandler to null via DisposeCommitHandler(). - /// The null-forgiving operator on line 98 causes a NullReferenceException. + /// Validates that Ack does not throw when CheckpointCommitHandler is concurrently + /// nulled by Resubscribe/DisposeCommitHandler on another thread while the + /// AsyncHandlingFilter worker is still completing a message. /// [Test] [Retry(3)] public async Task Should_not_throw_nre_when_ack_races_with_resubscribe(CancellationToken ct) { // Arrange var loggerFactory = LoggingExtensions.GetLoggerFactory(); - var nreTcs = new TaskCompletionSource(); - var ackStarted = new TaskCompletionSource(); - var proceedToAck = new TaskCompletionSource(); + var nreTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var ackStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var proceedToAck = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var options = new TestSubscriptionOptions { SubscriptionId = "test-ack-race", @@ -212,9 +211,8 @@ await subscription.Subscribe( // Give Resubscribe a moment to dispose the commit handler await Task.Delay(200, ct); - // Let the handler complete — the AsyncHandlingFilter worker will now call Acknowledge, - // which calls Ack → CheckpointCommitHandler!.Commit(). If the handler is already null, - // this is the NRE. + // Let the handler complete — the AsyncHandlingFilter worker will now call Acknowledge → Ack. + // Without the fix, the commit handler is already null at this point, causing an NRE. proceedToAck.TrySetResult(); // Assert — wait for either the NRE or a timeout