Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,18 @@ ValueTask NackOnAsyncWorker(IMessageConsumeContext context, Exception exception)

[MethodImpl(MethodImplOptions.AggressiveInlining)]
ValueTask Ack(IMessageConsumeContext context) {
// Capture locally — CheckpointCommitHandler can be nulled by Resubscribe/DisposeCommitHandler
// on another thread while the async worker is still completing a message.
var handler = CheckpointCommitHandler;

if (handler is null) return default;

var eventPosition = GetPositionFromContext(context);
LastProcessed = eventPosition;

context.LogContext.MessageAcked(context.MessageType, context.GlobalPosition);

return CheckpointCommitHandler!.Commit(
return handler.Commit(
new(eventPosition.Position!.Value, context.Sequence, eventPosition.Created) { LogContext = context.LogContext },
context.CancellationToken
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,99 @@ public override ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContex
}
}

/// <summary>
/// Validates that Ack does not throw when CheckpointCommitHandler is concurrently
/// nulled by Resubscribe/DisposeCommitHandler on another thread while the
/// AsyncHandlingFilter worker is still completing a message.
/// </summary>
[Test]
[Retry(3)]
public async Task Should_not_throw_nre_when_ack_races_with_resubscribe(CancellationToken ct) {
// Arrange
var loggerFactory = LoggingExtensions.GetLoggerFactory();
var nreTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
var ackStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var proceedToAck = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

var options = new TestSubscriptionOptions {
SubscriptionId = "test-ack-race",
ThrowOnError = true,
CheckpointCommitBatchSize = 1,
CheckpointCommitDelayMs = 100
};

// A handler that signals when it's about to ack, then waits for the test to
// trigger resubscribe before the ack path runs.
var handler = new SlowAckHandler(ackStarted, proceedToAck);
var pipe = new ConsumePipe().AddDefaultConsumer(handler);

var checkpointStore = new NoOpCheckpointStore();

var subscription = new TestPollingSubscription(
options,
checkpointStore,
pipe,
loggerFactory,
eventCount: 20
);

// Act
await subscription.Subscribe(
_ => { },
(_, _, ex) => {
if (ex is NullReferenceException nre) nreTcs.TrySetResult(nre);
},
ct
);

// Wait until the handler has processed an event and is about to ack
var started = await Task.WhenAny(ackStarted.Task, Task.Delay(TimeSpan.FromSeconds(10), ct));
started.ShouldBe(ackStarted.Task, "Handler should have started processing an event");

// Now trigger Dropped → Resubscribe, which will null CheckpointCommitHandler
subscription.TriggerDropped();

// Give Resubscribe a moment to dispose the commit handler
await Task.Delay(200, ct);

// Let the handler complete — the AsyncHandlingFilter worker will now call Acknowledge → Ack.
// Without the fix, the commit handler is already null at this point, causing an NRE.
proceedToAck.TrySetResult();

// Assert — wait for either the NRE or a timeout
var result = await Task.WhenAny(nreTcs.Task, Task.Delay(TimeSpan.FromSeconds(5), ct));

if (result == nreTcs.Task) {
var exception = await nreTcs.Task;
Assert.Fail(
$"NullReferenceException in Ack path during resubscribe race: {exception}. " +
"CheckpointCommitHandler was null when Ack tried to call Commit()."
);
}

// Cleanup
await subscription.Unsubscribe(_ => { }, ct);
}

/// <summary>
/// A handler that signals the test when processing is happening,
/// then blocks until the test allows it to complete. This creates the
/// window for the race between Ack and Resubscribe.
/// </summary>
class SlowAckHandler(TaskCompletionSource ackStarted, TaskCompletionSource proceedToAck) : BaseEventHandler {
int _signaled;

public override async ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContext context) {
// Signal only on the first event to avoid double-signaling
if (Interlocked.CompareExchange(ref _signaled, 1, 0) == 0) {
ackStarted.TrySetResult();
await proceedToAck.Task;
}

return EventHandlingStatus.Success;
}
}

record TestSubscriptionOptions : SubscriptionWithCheckpointOptions;

/// <summary>
Expand All @@ -182,6 +275,12 @@ class TestPollingSubscription(
) {
TaskRunner? _runner;

/// <summary>
/// Exposes the protected Dropped method so the test can trigger a resubscribe.
/// </summary>
public void TriggerDropped()
=> Dropped(DropReason.SubscriptionError, new InvalidOperationException("Simulated drop for race test"));

protected override ValueTask Subscribe(CancellationToken cancellationToken) {
_runner = new TaskRunner(token => PollEvents(token)).Start();

Expand Down
Loading