diff --git a/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs b/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs index 11469191..2e3bd981 100644 --- a/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs +++ b/src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs @@ -90,12 +90,18 @@ ValueTask NackOnAsyncWorker(IMessageConsumeContext context, Exception exception) [MethodImpl(MethodImplOptions.AggressiveInlining)] ValueTask Ack(IMessageConsumeContext context) { + // Capture locally — CheckpointCommitHandler can be nulled by Resubscribe/DisposeCommitHandler + // on another thread while the async worker is still completing a message. + var handler = CheckpointCommitHandler; + + if (handler is null) return default; + var eventPosition = GetPositionFromContext(context); LastProcessed = eventPosition; context.LogContext.MessageAcked(context.MessageType, context.GlobalPosition); - return CheckpointCommitHandler!.Commit( + return handler.Commit( new(eventPosition.Position!.Value, context.Sequence, eventPosition.Created) { LogContext = context.LogContext }, context.CancellationToken ); diff --git a/src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs b/src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs index 405d634b..1ca17fc4 100644 --- a/src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs +++ b/src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs @@ -156,6 +156,99 @@ public override ValueTask HandleEvent(IMessageConsumeContex } } + /// + /// Validates that Ack does not throw when CheckpointCommitHandler is concurrently + /// nulled by Resubscribe/DisposeCommitHandler on another thread while the + /// AsyncHandlingFilter worker is still completing a message. + /// + [Test] + [Retry(3)] + public async Task Should_not_throw_nre_when_ack_races_with_resubscribe(CancellationToken ct) { + // Arrange + var loggerFactory = LoggingExtensions.GetLoggerFactory(); + var nreTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var ackStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var proceedToAck = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var options = new TestSubscriptionOptions { + SubscriptionId = "test-ack-race", + ThrowOnError = true, + CheckpointCommitBatchSize = 1, + CheckpointCommitDelayMs = 100 + }; + + // A handler that signals when it's about to ack, then waits for the test to + // trigger resubscribe before the ack path runs. + var handler = new SlowAckHandler(ackStarted, proceedToAck); + var pipe = new ConsumePipe().AddDefaultConsumer(handler); + + var checkpointStore = new NoOpCheckpointStore(); + + var subscription = new TestPollingSubscription( + options, + checkpointStore, + pipe, + loggerFactory, + eventCount: 20 + ); + + // Act + await subscription.Subscribe( + _ => { }, + (_, _, ex) => { + if (ex is NullReferenceException nre) nreTcs.TrySetResult(nre); + }, + ct + ); + + // Wait until the handler has processed an event and is about to ack + var started = await Task.WhenAny(ackStarted.Task, Task.Delay(TimeSpan.FromSeconds(10), ct)); + started.ShouldBe(ackStarted.Task, "Handler should have started processing an event"); + + // Now trigger Dropped → Resubscribe, which will null CheckpointCommitHandler + subscription.TriggerDropped(); + + // Give Resubscribe a moment to dispose the commit handler + await Task.Delay(200, ct); + + // Let the handler complete — the AsyncHandlingFilter worker will now call Acknowledge → Ack. + // Without the fix, the commit handler is already null at this point, causing an NRE. + proceedToAck.TrySetResult(); + + // Assert — wait for either the NRE or a timeout + var result = await Task.WhenAny(nreTcs.Task, Task.Delay(TimeSpan.FromSeconds(5), ct)); + + if (result == nreTcs.Task) { + var exception = await nreTcs.Task; + Assert.Fail( + $"NullReferenceException in Ack path during resubscribe race: {exception}. " + + "CheckpointCommitHandler was null when Ack tried to call Commit()." + ); + } + + // Cleanup + await subscription.Unsubscribe(_ => { }, ct); + } + + /// + /// A handler that signals the test when processing is happening, + /// then blocks until the test allows it to complete. This creates the + /// window for the race between Ack and Resubscribe. + /// + class SlowAckHandler(TaskCompletionSource ackStarted, TaskCompletionSource proceedToAck) : BaseEventHandler { + int _signaled; + + public override async ValueTask HandleEvent(IMessageConsumeContext context) { + // Signal only on the first event to avoid double-signaling + if (Interlocked.CompareExchange(ref _signaled, 1, 0) == 0) { + ackStarted.TrySetResult(); + await proceedToAck.Task; + } + + return EventHandlingStatus.Success; + } + } + record TestSubscriptionOptions : SubscriptionWithCheckpointOptions; /// @@ -182,6 +275,12 @@ class TestPollingSubscription( ) { TaskRunner? _runner; + /// + /// Exposes the protected Dropped method so the test can trigger a resubscribe. + /// + public void TriggerDropped() + => Dropped(DropReason.SubscriptionError, new InvalidOperationException("Simulated drop for race test")); + protected override ValueTask Subscribe(CancellationToken cancellationToken) { _runner = new TaskRunner(token => PollEvents(token)).Start();