Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,15 @@ def start
#
# @return [MessageListener] returns self so calls can be chained.
#
def stop
def stop shutdown_setting: :wait_for_processing, timeout: nil
unless [:wait_for_processing, :nack_immediately].include? shutdown_setting
raise ArgumentError, "Invalid shutdown_setting: #{shutdown_setting}"
end

synchronize do
@started = false
@stopped = true
@stream_pool.map(&:stop)
@stream_pool.each { |s| s.stop shutdown_setting: shutdown_setting, timeout: timeout }
wait_stop_buffer_thread!
self
end
Expand Down Expand Up @@ -182,8 +186,8 @@ def wait! timeout = nil
#
# @return [MessageListener] returns self so calls can be chained.
#
def stop! timeout = nil
stop
def stop! timeout = nil, shutdown_setting: :wait_for_processing
stop shutdown_setting: shutdown_setting, timeout: timeout
wait! timeout
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,21 @@ def empty?
end
end

def wait_until_empty timeout = nil
synchronize do
if timeout
target_time = Time.now + timeout
while !@inventory.empty?
remaining = target_time - Time.now
break if remaining <= 0
@wait_cond.wait remaining
end
else
@wait_cond.wait_while { !@inventory.empty? }
end
end
end

def start
@background_thread ||= Thread.new { background_run }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def initialize subscriber
@subscriber = subscriber

@request_queue = nil
@stopped = nil
@streaming_stopped = nil
@fully_stopped = nil
@paused = nil
@pause_cond = new_cond
@exactly_once_delivery_enabled = false
Expand Down Expand Up @@ -93,9 +94,9 @@ def start
self
end

def stop
def stop shutdown_setting: :wait_for_processing, timeout: nil
synchronize do
break if @stopped
break if @streaming_stopped

subscriber.service.logger.log :info, "subscriber-streams" do
"stopping stream for subscription #{@subscriber.subscription_name}"
Expand All @@ -105,23 +106,63 @@ def stop
@request_queue&.push self

# Signal to the background thread that we are stopped.
@stopped = true
@streaming_stopped = true
@pause_cond.broadcast

# Now that the reception thread is stopped, immediately stop the
# callback thread pool. All queued callbacks will see the stream
# is stopped and perform a noop.
@callback_thread_pool.shutdown

# Once all the callbacks are stopped, we can stop the inventory.
@inventory.stop
if shutdown_setting == :nack_immediately
nack_unprocessed_messages!
@fully_stopped = true
@callback_thread_pool.shutdown
@inventory.stop
else
# :wait_for_processing
@shutdown_thread = Thread.new do
if timeout
wait_time = [timeout - 30, 0].max
@inventory.wait_until_empty wait_time
if !@inventory.empty?
nack_unprocessed_messages!
end
else
@inventory.wait_until_empty nil
end
synchronize do
@fully_stopped = true
@callback_thread_pool.shutdown
@inventory.stop
end
end
end
end

self
end

def nack_unprocessed_messages!
synchronize do
ack_ids = @inventory.ack_ids
puts "nack_unprocessed_messages! called, ack_ids: #{ack_ids}"
unless ack_ids.empty?
begin
subscriber.service.modify_ack_deadline subscriber.subscription_name, ack_ids, 0
puts "modify_ack_deadline succeeded"
rescue StandardError => e
puts "modify_ack_deadline failed: #{e.message}"
subscriber.service.logger.log :error, "subscriber-streams" do
"Failed to nack unprocessed messages: #{e.message}"
end
end
@inventory.remove *ack_ids
end
end
end

def stopped?
synchronize { @stopped }
synchronize { @streaming_stopped }
end

def fully_stopped?
synchronize { @fully_stopped }
end

def paused?
Expand All @@ -133,6 +174,7 @@ def running?
end

def wait! timeout = nil
@shutdown_thread&.join timeout
# Wait for all queued callbacks to be processed.
@callback_thread_pool.wait_for_termination timeout

Expand Down Expand Up @@ -222,15 +264,15 @@ class RestartStream < StandardError; end
def background_run
synchronize do
# Don't allow a stream to restart if already stopped
if @stopped
if @streaming_stopped
subscriber.service.logger.log :debug, "subscriber-streams" do
"not filling stream for subscription #{@subscriber.subscription_name} because stream is already" \
" stopped"
end
return
end

@stopped = false
@streaming_stopped = false
@paused = false

# signal to the previous queue to shut down
Expand All @@ -252,14 +294,14 @@ def background_run

loop do
synchronize do
if @paused && !@stopped
if @paused && !@streaming_stopped
@pause_cond.wait
next
end
end

# Break loop, close thread if stopped
break if synchronize { @stopped }
break if synchronize { @streaming_stopped }

begin
# Cannot synchronize the enumerator, causes deadlock
Expand Down Expand Up @@ -297,7 +339,7 @@ def background_run

# Has the loop broken but we aren't stopped?
# Could be GRPC has thrown an internal error, so restart.
raise RestartStream unless synchronize { @stopped }
raise RestartStream unless synchronize { @streaming_stopped }

# We must be stopped, tell the stream to quit.
stop
Expand Down Expand Up @@ -372,7 +414,7 @@ def perform_callback_sync rec_msg
subscriber.service.logger.log :info, "callback-delivery" do
"message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) delivery to user callbacks"
end
@subscriber.callback.call rec_msg unless stopped?
@subscriber.callback.call rec_msg unless fully_stopped?
rescue StandardError => e
subscriber.service.logger.log :info, "callback-exceptions" do
"message (ID #{rec_msg.message_id}, ackID #{rec_msg.ack_id}) caused a user callback exception: " \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,73 @@ def stub.modify_ack_deadline subscription:, ack_ids:, ack_deadline_seconds:
listener.stop
listener.wait!
end

it "should nack unprocessed messages when stopped with nack_immediately" do
pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
response_groups = [[pull_res1]]

stub = StreamingPullStub.new response_groups
nacked = false

subscriber.service.mocked_subscription_admin = stub
def stub.modify_ack_deadline subscription:, ack_ids:, ack_deadline_seconds:
puts "modify_ack_deadline called with #{ack_deadline_seconds}"
if ack_deadline_seconds == 0
nacked = true
end
nil
end

listener = subscriber.listen streams: 1 do |msg|
puts "callback started"
sleep 0.5
puts "callback finished"
end

listener.start

# Wait for message to be pulled and added to inventory
sleep 0.1
puts "inventory count: #{listener.stream_pool.first.inventory.count}"

listener.stop shutdown_setting: :nack_immediately
puts "stop called"
listener.wait!
puts "wait finished"

assert nacked
end

it "should wait for processing when stopped with wait_for_processing" do
pull_res1 = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc]
response_groups = [[pull_res1]]

stub = StreamingPullStub.new response_groups
called = false
nacked = false

subscriber.service.mocked_subscription_admin = stub
def stub.modify_ack_deadline subscription:, ack_ids:, ack_deadline_seconds:
if ack_deadline_seconds == 0
nacked = true
end
nil
end

listener = subscriber.listen streams: 1 do |msg|
sleep 0.5
called = true
end

listener.start

# Wait for message to be pulled and added to inventory
sleep 0.1

listener.stop shutdown_setting: :wait_for_processing
listener.wait!

assert called
refute nacked
end
end
Loading