Skip to content

feat(Pub/Sub): Support WaitForProcessing and NackInmediately shutdown options#15545

Open
amanda-tarafa wants to merge 1 commit intogoogleapis:mainfrom
amanda-tarafa:pubsub-shutdown
Open

feat(Pub/Sub): Support WaitForProcessing and NackInmediately shutdown options#15545
amanda-tarafa wants to merge 1 commit intogoogleapis:mainfrom
amanda-tarafa:pubsub-shutdown

Conversation

@amanda-tarafa
Copy link
Copy Markdown
Contributor

Previous shutdown mechanism is now obsolete.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces ShutdownOptions to SubscriberClient, enabling NackImmediately and WaitForProcessing shutdown modes. The changes include refactoring internal cancellation logic in SubscriberClientImpl, updating documentation, and expanding test coverage. Review feedback focuses on correcting multiple instances of the typo "Inmediately" to "Immediately" in constants, variables, and documentation.

Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/docs/index.md
Copy link
Copy Markdown
Contributor Author

@amanda-tarafa amanda-tarafa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typos fixed.

Comment thread apis/Google.Cloud.PubSub.V1/docs/index.md
Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs Outdated
@amanda-tarafa
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a more flexible shutdown mechanism for SubscriberClient, allowing users to specify NackImmediately or WaitForProcessing modes through a new StopAsync overload. The internal implementation is refactored to use granular cancellation tokens for managing different stages of the shutdown process. Review feedback suggests removing a side effect from a property getter in the LinkedCancellationTokenSource helper class and ensuring it implements IDisposable to properly release resources. Additionally, a minor correction was recommended for missing spaces in a log message concatenation.

Copy link
Copy Markdown
Contributor

@robertvoinescu-work robertvoinescu-work left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM: With some minor comments.

@amanda-tarafa
Copy link
Copy Markdown
Contributor Author

(I'll add the method I accidentally removed, but that's just a wrapper)

… options.

Previous shutdown mechanism is now obsolete.
@amanda-tarafa
Copy link
Copy Markdown
Contributor Author

(I'll add the method I accidentally removed, but that's just a wrapper)

This seems like a bug on the diff checker, I didn't remove the method, I removed the overriding implementation in SubscriberClientImpl, and I changed the inherited implementation in the base class SubscriberClient from throwing to delegating on the newest StopAsync method.

I won't add the allow breaking changes label now, so that we double check before merging that the only "breaking" change is this one, but we can merge as is otherwise.

@amanda-tarafa
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a more granular shutdown mechanism for the SubscriberClient, allowing users to choose between graceful processing of received messages or immediate negative acknowledgement via the new ShutdownOptions and ShutdownMode. The implementation involves refactoring internal cancellation logic to manage streaming pull, message handling, and push operations independently. Feedback provided focuses on correcting a typo in a source comment, removing a redundant null-conditional operator, and improving the management of background tasks in the shutdown sequence to prevent potential resource leaks.

HandlePush();
// Start stream-keep-alive ping
HandleStreamPing();
// On NackInmediately we nack everything that we have client side.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Typo: "NackInmediately" should be "NackImmediately".

            // On NackImmediately we nack everything that we have client side.

}

// If we need to nack anything, do so.
if (idsToNack?.Count > 0)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The null-conditional operator ?. is redundant here because idsToNack is initialized as a new HashSet<string> at line 1206 and is never null.

                if (idsToNack.Count > 0)

Comment on lines +290 to +295
void CancelAfterDelay(CancellationTokenSource cts, TimeSpan delay) =>
_taskHelper.Run(async () =>
{
await _taskHelper.ConfigureAwait(_scheduler.Delay(delay, CancellationToken.None));
cts.Cancel();
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The background tasks created by CancelAfterDelay use CancellationToken.None for the delay. If the subscriber stops gracefully before the timeout, these tasks will remain active until the delay expires. To ensure consistency and prevent resource leaks, consider using the WithCancellationToken extension method for cancellable waits, or pass a cancellation token to _scheduler.Delay that is triggered when _mainTcs.Task completes.

References
  1. The WithCancellationToken extension method is available and should be used for cancellable waits on tasks.

CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, _taskHelper.TaskScheduler);
// We create the continuation on a new task to avoid the continuation to be executed on this thread
// if `task` is already completed. Note that although the continuation "only equeues", if there's
// a task (i.e. the main loop) waiting to dequeue, the dequeu would happen synchronously after the
Copy link
Copy Markdown
Contributor

@robertvoinescu-work robertvoinescu-work Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: dequeu -> dequeue

Add(_eventReceiptModAckForExactlyOnceDelivery.WaitAsync(_softStopCts.Token)
.ContinueWith(task => ProcessSuccessfulMessages(msgs, msgIds), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, _taskHelper.TaskScheduler)
Add(_eventReceiptModAckForExactlyOnceDelivery.WaitAsync(_handlerCts.Token)
.ContinueWith(task => ProcessSuccessfulMessages(msgs, msgIds), _handlerCts.Token, TaskContinuationOptions.ExecuteSynchronously, _taskHelper.TaskScheduler)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This continue with is on a task so it'll return a Task<Task> after ContinueWith, where the outer task completes on the first await in ProcessSuccessfulMessages. On shutdown this might cause issues where we shutdown before ProcessSuccessfulMessages is called as we think all subtasks are complete. The simples fix is we unwrap:

_eventReceiptModAckForExactlyOnceDelivery.WaitAsync(_handlerCts.Token)
                        .ContinueWith(task => ProcessSuccessfulMessages(msgs, msgIds).unwrap()

or we switch to using the await keyword instead of .ContinueWith

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was here before so maybe a follow-up is fine.

}

HashSet<string> idsToNack = new HashSet<string>();
foreach (var leasingSet in messageSetsToNack)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These aren't necessarily leasing sets since they may also include expired sets, maybe just messageSet

Backoff = _disconnectBackoff;
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Blank line

}
// Lock ack/nack-queues, this is accessed concurrently here and in "master" thread.
lock (_lock)
if (wasInLeaseTracking)
Copy link
Copy Markdown
Contributor

@robertvoinescu-work robertvoinescu-work Apr 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to confirm this logic since it departs from the previous for non-shutdown scenarios. We won't Ack/Nack any messages that have hit their max extension period (i.e. 'expired').

Add(delayTask, new NextAction(false, hasAcksOrNacks
? () => { ackActionToRetry(retryIds); StartPush(); }
: () => { extendActionToRetry(extendIds.Where(j => retryIds.Contains(j.Id))); StartPush(); }));
: () => { extendActionToRetry(extendIds.Where(j => retryIds.Contains(j.Id))); StartPush(); }));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: White space.

{
lock (_lock)
{
_messageSetsInLeasing.Remove(leaseTracking);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a side note for the future: I think for me ideally the message state would be encapsulated within the message object itself, rather than tracking it externally using HashSets in this broader scope. I know changing that would require a significant refactor, so I'm completely fine with the current approach. Just wanted to note it as a potential task to tackle when we have the bandwidth.

// if `task` is already completed. Note that although the continuation "only equeues", if there's
// a task (i.e. the main loop) waiting to dequeue, the dequeu would happen synchronously after the
// synchronous enqueue as well as the execution of the next task.
_taskHelper.Run(() =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to confirm, this is the line that fixes the intermittent deadlock we were seeing in the integrations tests prior, is that correct?

}
if (acks.Count > 0)
{
_pushInFlight += acks.Count;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should lock on _pushInFlight as it's accessed here, in HandleAckResponse and in the main StartAsync loop. Maybe it's worth it to create a helper method to reduce the locking duplication, but your call:

IncrementPushInFlightCount(int increment)
{
    lock(_lock)
    {
         _pushInFlight += increment
     }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants