feat(Pub/Sub): Support WaitForProcessing and NackInmediately shutdown options#15545
feat(Pub/Sub): Support WaitForProcessing and NackInmediately shutdown options#15545amanda-tarafa wants to merge 1 commit intogoogleapis:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces ShutdownOptions to SubscriberClient, enabling NackImmediately and WaitForProcessing shutdown modes. The changes include refactoring internal cancellation logic in SubscriberClientImpl, updating documentation, and expanding test coverage. Review feedback focuses on correcting multiple instances of the typo "Inmediately" to "Immediately" in constants, variables, and documentation.
amanda-tarafa
left a comment
There was a problem hiding this comment.
Typos fixed.
3ed3c8d to
f31e1b5
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a more flexible shutdown mechanism for SubscriberClient, allowing users to specify NackImmediately or WaitForProcessing modes through a new StopAsync overload. The internal implementation is refactored to use granular cancellation tokens for managing different stages of the shutdown process. Review feedback suggests removing a side effect from a property getter in the LinkedCancellationTokenSource helper class and ensuring it implements IDisposable to properly release resources. Additionally, a minor correction was recommended for missing spaces in a log message concatenation.
robertvoinescu-work
left a comment
There was a problem hiding this comment.
LGTM: With some minor comments.
f31e1b5 to
c719b95
Compare
|
(I'll add the method I accidentally removed, but that's just a wrapper) |
… options. Previous shutdown mechanism is now obsolete.
c719b95 to
366570e
Compare
This seems like a bug on the diff checker, I didn't remove the method, I removed the overriding implementation in I won't add the |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a more granular shutdown mechanism for the SubscriberClient, allowing users to choose between graceful processing of received messages or immediate negative acknowledgement via the new ShutdownOptions and ShutdownMode. The implementation involves refactoring internal cancellation logic to manage streaming pull, message handling, and push operations independently. Feedback provided focuses on correcting a typo in a source comment, removing a redundant null-conditional operator, and improving the management of background tasks in the shutdown sequence to prevent potential resource leaks.
| HandlePush(); | ||
| // Start stream-keep-alive ping | ||
| HandleStreamPing(); | ||
| // On NackInmediately we nack everything that we have client side. |
| } | ||
|
|
||
| // If we need to nack anything, do so. | ||
| if (idsToNack?.Count > 0) |
| void CancelAfterDelay(CancellationTokenSource cts, TimeSpan delay) => | ||
| _taskHelper.Run(async () => | ||
| { | ||
| await _taskHelper.ConfigureAwait(_scheduler.Delay(delay, CancellationToken.None)); | ||
| cts.Cancel(); | ||
| }); |
There was a problem hiding this comment.
The background tasks created by CancelAfterDelay use CancellationToken.None for the delay. If the subscriber stops gracefully before the timeout, these tasks will remain active until the delay expires. To ensure consistency and prevent resource leaks, consider using the WithCancellationToken extension method for cancellable waits, or pass a cancellation token to _scheduler.Delay that is triggered when _mainTcs.Task completes.
References
- The WithCancellationToken extension method is available and should be used for cancellable waits on tasks.
| CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, _taskHelper.TaskScheduler); | ||
| // We create the continuation on a new task to avoid the continuation to be executed on this thread | ||
| // if `task` is already completed. Note that although the continuation "only equeues", if there's | ||
| // a task (i.e. the main loop) waiting to dequeue, the dequeu would happen synchronously after the |
There was a problem hiding this comment.
NIT: dequeu -> dequeue
| Add(_eventReceiptModAckForExactlyOnceDelivery.WaitAsync(_softStopCts.Token) | ||
| .ContinueWith(task => ProcessSuccessfulMessages(msgs, msgIds), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, _taskHelper.TaskScheduler) | ||
| Add(_eventReceiptModAckForExactlyOnceDelivery.WaitAsync(_handlerCts.Token) | ||
| .ContinueWith(task => ProcessSuccessfulMessages(msgs, msgIds), _handlerCts.Token, TaskContinuationOptions.ExecuteSynchronously, _taskHelper.TaskScheduler) |
There was a problem hiding this comment.
This continue with is on a task so it'll return a Task<Task> after ContinueWith, where the outer task completes on the first await in ProcessSuccessfulMessages. On shutdown this might cause issues where we shutdown before ProcessSuccessfulMessages is called as we think all subtasks are complete. The simples fix is we unwrap:
_eventReceiptModAckForExactlyOnceDelivery.WaitAsync(_handlerCts.Token)
.ContinueWith(task => ProcessSuccessfulMessages(msgs, msgIds).unwrap()
or we switch to using the await keyword instead of .ContinueWith
There was a problem hiding this comment.
This was here before so maybe a follow-up is fine.
| } | ||
|
|
||
| HashSet<string> idsToNack = new HashSet<string>(); | ||
| foreach (var leasingSet in messageSetsToNack) |
There was a problem hiding this comment.
These aren't necessarily leasing sets since they may also include expired sets, maybe just messageSet
| Backoff = _disconnectBackoff; | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
NIT: Blank line
| } | ||
| // Lock ack/nack-queues, this is accessed concurrently here and in "master" thread. | ||
| lock (_lock) | ||
| if (wasInLeaseTracking) |
There was a problem hiding this comment.
Just want to confirm this logic since it departs from the previous for non-shutdown scenarios. We won't Ack/Nack any messages that have hit their max extension period (i.e. 'expired').
| Add(delayTask, new NextAction(false, hasAcksOrNacks | ||
| ? () => { ackActionToRetry(retryIds); StartPush(); } | ||
| : () => { extendActionToRetry(extendIds.Where(j => retryIds.Contains(j.Id))); StartPush(); })); | ||
| : () => { extendActionToRetry(extendIds.Where(j => retryIds.Contains(j.Id))); StartPush(); })); |
There was a problem hiding this comment.
NIT: White space.
| { | ||
| lock (_lock) | ||
| { | ||
| _messageSetsInLeasing.Remove(leaseTracking); |
There was a problem hiding this comment.
As a side note for the future: I think for me ideally the message state would be encapsulated within the message object itself, rather than tracking it externally using HashSets in this broader scope. I know changing that would require a significant refactor, so I'm completely fine with the current approach. Just wanted to note it as a potential task to tackle when we have the bandwidth.
| // if `task` is already completed. Note that although the continuation "only equeues", if there's | ||
| // a task (i.e. the main loop) waiting to dequeue, the dequeu would happen synchronously after the | ||
| // synchronous enqueue as well as the execution of the next task. | ||
| _taskHelper.Run(() => |
There was a problem hiding this comment.
Just want to confirm, this is the line that fixes the intermittent deadlock we were seeing in the integrations tests prior, is that correct?
| } | ||
| if (acks.Count > 0) | ||
| { | ||
| _pushInFlight += acks.Count; |
There was a problem hiding this comment.
I think we should lock on _pushInFlight as it's accessed here, in HandleAckResponse and in the main StartAsync loop. Maybe it's worth it to create a helper method to reduce the locking duplication, but your call:
IncrementPushInFlightCount(int increment)
{
lock(_lock)
{
_pushInFlight += increment
}
}
Previous shutdown mechanism is now obsolete.