Skip to content

Commit bc0efbe

Browse files
committed
dont assume subscription names, you may have many for the same topic for fanout
1 parent 5d2231c commit bc0efbe

3 files changed

Lines changed: 16 additions & 33 deletions

File tree

README.md

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,14 @@ And then execute:
1818

1919
### Conventions
2020

21-
- Subscription names follow the format `"#{topic_name}-subscription"`
2221
- Messages are hashes
23-
- Message hashes are encoded to JSON for transport and decoded back to hashes when reading
2422

2523
### Receiving Messages
2624

2725
```ruby
2826
reader = Hover::PubSub::Reader.new(
2927
project_id: ENV['GCP_PUBSUB_PROJECT_ID'],
30-
topic_names: ['list', 'of', 'topic', 'names'],
28+
subscription_names: ['list', 'of', 'subscription', 'names'],
3129
ack_deadline: 30
3230
)
3331

@@ -38,12 +36,9 @@ end
3836

3937
A `Reader` instance has a `#read` instance method that takes a block. The block is responsible for processing each message. If the block returns true, processing is considered successful and the message is acknowledged and deleted. If the block returns false the message goes back to the queue for another reader to attempt processing again.
4038

41-
When the `#read` method is called a thread is created for each topic subscription. And all subscriptions are read from concurrently. It is safe to have more than one reader reading at the same time. With that you can scale up the number of active readers as the number of messages needing to be processed grows.
42-
43-
`#read` does not yield the [received message](https://googleapis.dev/ruby/google-cloud-pubsub/latest/Google/Cloud/PubSub/ReceivedMessage.html) objects to your block. It assumes your messages are JSON strings and decodes them and returns the decoded object.
44-
45-
`Reader` assumes your subscription names follow the pattern `"#{topic_name}-subscription"`.
39+
When the `#read` method is called a thread is created for each subscription. And all subscriptions are read from concurrently. It is safe to have more than one reader reading at the same time. With that you can scale up the number of active readers as the number of messages needing to be processed grows.
4640

41+
`#read` does not yield the [received message](https://googleapis.dev/ruby/google-cloud-pubsub/latest/Google/Cloud/PubSub/ReceivedMessage.html) objects to your block. It assumes your messages are JSON strings and decodes them and returns the decoded object.
4742

4843
### Publishing Messages
4944

lib/hover/pub_sub/reader.rb

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,21 @@
66
module Hover
77
module PubSub
88
class Reader
9-
def self.subscription_name(topic_name)
10-
"#{topic_name}-subscription"
11-
end
12-
139
def self.parse_message(message)
1410
json = message.data
1511
JSON.parse(json)
1612
end
1713

18-
def initialize(project_id:, topic_names:, ack_deadline:)
14+
def initialize(project_id:, subscription_names:, ack_deadline:)
1915
@project_id = project_id
20-
@topic_names = topic_names
16+
@subscription_names = subscription_names
2117
@ack_deadline = ack_deadline
2218
end
2319

2420
def read
25-
for_each_new_message do |topic_name, message|
21+
for_each_new_message do |subscription_name, message|
2622
parsed_message = self.class.parse_message(message)
27-
processed_successfully = yield(topic_name, parsed_message).eql?(true)
23+
processed_successfully = yield(subscription_name, parsed_message).eql?(true)
2824

2925
delete message if processed_successfully
3026
end
@@ -33,10 +29,10 @@ def read
3329
private
3430

3531
def for_each_new_message
36-
threads = subscriptions.map do |topic_name, subscription|
32+
threads = subscriptions.map do |subscription_name, subscription|
3733
Thread.new do
3834
subscription.pull(immediate: false).each do |message|
39-
yield(topic_name, message)
35+
yield(subscription_name, message)
4036
end
4137
end
4238
end
@@ -50,18 +46,10 @@ def delete(message)
5046
end
5147

5248
def subscriptions
53-
@topic_names.map.with_object({}) do |topic_name, hash|
54-
project.topic(topic_name, skip_lookup: true)
55-
56-
hash[topic_name] = subscription(topic_name)
57-
end
58-
end
59-
60-
def subscription(topic_name)
61-
name = self.class.subscription_name(topic_name)
62-
63-
project.subscription(name, skip_lookup: true).tap do |subscription|
64-
subscription.deadline = @ack_deadline
49+
@subscription_names.map.with_object({}) do |name, hash|
50+
hash[name] = project.subscription(name, skip_lookup: true).tap do |subscription|
51+
subscription.deadline = @ack_deadline
52+
end
6553
end
6654
end
6755

spec/hover/pub_sub/reader_spec.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
let(:instance) do
3232
described_class.new(
3333
project_id: @pubsub_project_id,
34-
topic_names: [topic_name],
34+
subscription_names: [subscription_name],
3535
ack_deadline: 1
3636
)
3737
end
@@ -55,8 +55,8 @@
5555
expect(messages.size).to eq(1)
5656
expect(messages.first).to eq(sent_message)
5757

58-
instance.read do |topic_name, message|
59-
raise "No messages expected got #{message.inspect} on topic #{topic_name}"
58+
instance.read do |subscription_name, message|
59+
raise "No messages expected got #{message.inspect} on subscriptions #{subscription_name}"
6060
end
6161
end
6262
end

0 commit comments

Comments
 (0)