diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb index 814ed067ee7d..7cb4e8af6d5a 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener.rb @@ -140,11 +140,15 @@ def start # # @return [MessageListener] returns self so calls can be chained. # - def stop + def stop shutdown_setting: :wait_for_processing, timeout: nil + unless [:wait_for_processing, :nack_immediately].include? shutdown_setting + raise ArgumentError, "Invalid shutdown_setting: #{shutdown_setting}" + end + synchronize do @started = false @stopped = true - @stream_pool.map(&:stop) + @stream_pool.each { |s| s.stop shutdown_setting: shutdown_setting, timeout: timeout } wait_stop_buffer_thread! self end @@ -182,8 +186,8 @@ def wait! timeout = nil # # @return [MessageListener] returns self so calls can be chained. # - def stop! timeout = nil - stop + def stop! timeout = nil, shutdown_setting: :wait_for_processing + stop shutdown_setting: shutdown_setting, timeout: timeout wait! timeout end diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb index 5dca89f5665e..0dffac30f2ce 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/inventory.rb @@ -109,6 +109,21 @@ def empty? end end + def wait_until_empty timeout = nil + synchronize do + if timeout + target_time = Time.now + timeout + while !@inventory.empty? + remaining = target_time - Time.now + break if remaining <= 0 + @wait_cond.wait remaining + end + else + @wait_cond.wait_while { !@inventory.empty? } + end + end + end + def start @background_thread ||= Thread.new { background_run } diff --git a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb index fa563dd17f57..45e56e3896d5 100644 --- a/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb +++ b/google-cloud-pubsub/lib/google/cloud/pubsub/message_listener/stream.rb @@ -57,7 +57,8 @@ def initialize subscriber @subscriber = subscriber @request_queue = nil - @stopped = nil + @streaming_stopped = nil + @fully_stopped = nil @paused = nil @pause_cond = new_cond @exactly_once_delivery_enabled = false @@ -93,9 +94,9 @@ def start self end - def stop + def stop shutdown_setting: :wait_for_processing, timeout: nil synchronize do - break if @stopped + break if @streaming_stopped subscriber.service.logger.log :info, "subscriber-streams" do "stopping stream for subscription #{@subscriber.subscription_name}" @@ -105,23 +106,63 @@ def stop @request_queue&.push self # Signal to the background thread that we are stopped. - @stopped = true + @streaming_stopped = true @pause_cond.broadcast - # Now that the reception thread is stopped, immediately stop the - # callback thread pool. All queued callbacks will see the stream - # is stopped and perform a noop. - @callback_thread_pool.shutdown - - # Once all the callbacks are stopped, we can stop the inventory. - @inventory.stop + if shutdown_setting == :nack_immediately + nack_unprocessed_messages! + @fully_stopped = true + @callback_thread_pool.shutdown + @inventory.stop + else + # :wait_for_processing + @shutdown_thread = Thread.new do + if timeout + wait_time = [timeout - 30, 0].max + @inventory.wait_until_empty wait_time + if !@inventory.empty? + nack_unprocessed_messages! + end + else + @inventory.wait_until_empty nil + end + synchronize do + @fully_stopped = true + @callback_thread_pool.shutdown + @inventory.stop + end + end + end end self end + def nack_unprocessed_messages! + synchronize do + ack_ids = @inventory.ack_ids + puts "nack_unprocessed_messages! called, ack_ids: #{ack_ids}" + unless ack_ids.empty? + begin + subscriber.service.modify_ack_deadline subscriber.subscription_name, ack_ids, 0 + puts "modify_ack_deadline succeeded" + rescue StandardError => e + puts "modify_ack_deadline failed: #{e.message}" + subscriber.service.logger.log :error, "subscriber-streams" do + "Failed to nack unprocessed messages: #{e.message}" + end + end + @inventory.remove *ack_ids + end + end + end + def stopped? - synchronize { @stopped } + synchronize { @streaming_stopped } + end + + def fully_stopped? + synchronize { @fully_stopped } end def paused? @@ -133,6 +174,7 @@ def running? end def wait! timeout = nil + @shutdown_thread&.join timeout # Wait for all queued callbacks to be processed. @callback_thread_pool.wait_for_termination timeout @@ -222,7 +264,7 @@ class RestartStream < StandardError; end def background_run synchronize do # Don't allow a stream to restart if already stopped - if @stopped + if @streaming_stopped subscriber.service.logger.log :debug, "subscriber-streams" do "not filling stream for subscription #{@subscriber.subscription_name} because stream is already" \ " stopped" @@ -230,7 +272,7 @@ def background_run return end - @stopped = false + @streaming_stopped = false @paused = false # signal to the previous queue to shut down @@ -252,14 +294,14 @@ def background_run loop do synchronize do - if @paused && !@stopped + if @paused && !@streaming_stopped @pause_cond.wait next end end # Break loop, close thread if stopped - break if synchronize { @stopped } + break if synchronize { @streaming_stopped } begin # Cannot synchronize the enumerator, causes deadlock @@ -297,7 +339,7 @@ def background_run # Has the loop broken but we aren't stopped? # Could be GRPC has thrown an internal error, so restart. - raise RestartStream unless synchronize { @stopped } + raise RestartStream unless synchronize { @streaming_stopped } # We must be stopped, tell the stream to quit. stop @@ -372,7 +414,7 @@ def perform_callback_sync rec_msg subscriber.service.logger.log :info, "callback-delivery" do "message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) delivery to user callbacks" end - @subscriber.callback.call rec_msg unless stopped? + @subscriber.callback.call rec_msg unless fully_stopped? rescue StandardError => e subscriber.service.logger.log :info, "callback-exceptions" do "message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) caused a user callback exception: " \ diff --git a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/stream_test.rb b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/stream_test.rb index 5f457b90405c..eed419ec74d4 100644 --- a/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/stream_test.rb +++ b/google-cloud-pubsub/test/google/cloud/pubsub/message_listener/stream_test.rb @@ -217,4 +217,73 @@ def stub.modify_ack_deadline subscription:, ack_ids:, ack_deadline_seconds: listener.stop listener.wait! end + + it "should nack unprocessed messages when stopped with nack_immediately" do + pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc] + response_groups = [[pull_res1]] + + stub = StreamingPullStub.new response_groups + nacked = false + + subscriber.service.mocked_subscription_admin = stub + def stub.modify_ack_deadline subscription:, ack_ids:, ack_deadline_seconds: + puts "modify_ack_deadline called with #{ack_deadline_seconds}" + if ack_deadline_seconds == 0 + nacked = true + end + nil + end + + listener = subscriber.listen streams: 1 do |msg| + puts "callback started" + sleep 0.5 + puts "callback finished" + end + + listener.start + + # Wait for message to be pulled and added to inventory + sleep 0.1 + puts "inventory count: #{listener.stream_pool.first.inventory.count}" + + listener.stop shutdown_setting: :nack_immediately + puts "stop called" + listener.wait! + puts "wait finished" + + assert nacked + end + + it "should wait for processing when stopped with wait_for_processing" do + pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc] + response_groups = [[pull_res1]] + + stub = StreamingPullStub.new response_groups + called = false + nacked = false + + subscriber.service.mocked_subscription_admin = stub + def stub.modify_ack_deadline subscription:, ack_ids:, ack_deadline_seconds: + if ack_deadline_seconds == 0 + nacked = true + end + nil + end + + listener = subscriber.listen streams: 1 do |msg| + sleep 0.5 + called = true + end + + listener.start + + # Wait for message to be pulled and added to inventory + sleep 0.1 + + listener.stop shutdown_setting: :wait_for_processing + listener.wait! + + assert called + refute nacked + end end