Skip to content

[KIP-932] - Add usage examples for share consumer applications#2288

Open
Kaushik Raina (k-raina) wants to merge 8 commits into
masterfrom
dev_kip-932_queues-for-kafka_usage_examples
Open

[KIP-932] - Add usage examples for share consumer applications#2288
Kaushik Raina (k-raina) wants to merge 8 commits into
masterfrom
dev_kip-932_queues-for-kafka_usage_examples

Conversation

@k-raina

@k-raina Kaushik Raina (k-raina) commented Jun 19, 2026

Copy link
Copy Markdown
Member

Summary

Adds runnable KIP-932 ShareConsumer examples to examples/, covering both
acknowledgement modes and the common deserialization/lifecycle patterns, plus
a README section indexing them.

Examples

File Demonstrates
share_consumer.py Implicit ack (default mode) — records acked on the next poll()
share_consumer_context_manager.py with statement usage so close() runs automatically on exit
share_consumer_commit_sync.py Explicit per-record ACCEPT/RELEASE/REJECT, committed synchronously with per-partition results
share_consumer_commit_async.py Explicit ack committed asynchronously, with broker outcomes via the ack-commit callback
share_consumer_avro.py DeserializingShareConsumer over Avro/Schema Registry, with poison-record (deserialization failure)

@confluent-cla-assistant

Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Base automatically changed from dev_kip-932_queues-for-kafka to master June 23, 2026 08:53
Copilot AI review requested due to automatic review settings June 23, 2026 11:39
@k-raina Kaushik Raina (k-raina) removed the request for review from Copilot June 23, 2026 11:39
messages = sc.poll(timeout=1.0) # returns a list (possibly empty)
try:
messages = sc.poll(timeout=1.0) # a list, possibly empty
except KafkaException as e:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add IllegalStateException and ConcurrentModificationException handling as well. Only in this file. We will ask user to refer this file for error handling.

Comment thread examples/share_consumer.py Outdated
Comment on lines +64 to +67
# A bad record. In implicit mode you can't ack it by hand
# (acknowledge() is rejected); the library automatically
# retries it (temporary errors) or discards it (permanent
# errors) on the next poll — it is never accepted. Just log it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# A bad record. In implicit mode you can't ack it by hand
# (acknowledge() is rejected); the library automatically
# retries it (temporary errors) or discards it (permanent
# errors) on the next poll — it is never accepted. Just log it.
# the records with msg.error() field set will be acknowledged internally with RELEASE for temporary errors and REJECT for permanent errors. Check [KIP](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255070434#KIP932:QueuesforKafka-Handlingbadrecords) for more details.

Comment thread examples/share_consumer.py Outdated
# errors) on the next poll — it is never accepted. Just log it.
sys.stderr.write('%% Error: %s\n' % msg.error())
continue
sys.stderr.write(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sys.stderr.write(
sys.stdout.write(

Comment thread examples/share_consumer.py Outdated
try:
messages = sc.poll(timeout=1.0) # a list, possibly empty
except KafkaException as e:
# Re-raise fatal errors; otherwise log and keep going.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Re-raise fatal errors; otherwise log and keep going.
# The consumer should stop consuming after fatal error.

Comment thread examples/share_consumer_avro.py Outdated
Comment on lines +114 to +117
# A record we received but can't decode. In explicit
# mode we still have to ack it — REJECT it as poison so
# it isn't redelivered.
sc.acknowledge(msg, AcknowledgeType.REJECT)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# A record we received but can't decode. In explicit
# mode we still have to ack it — REJECT it as poison so
# it isn't redelivered.
sc.acknowledge(msg, AcknowledgeType.REJECT)
# A record we received but can't decode. In explicit
# mode we still have to ack it — RELEASE it so that
# other consumer can pick and redeliver for processing
# again. In implicit ack mode, we currently don't release
# the record in the Preview internally.
sc.acknowledge(msg, AcknowledgeType.RELEASE)

Comment thread examples/share_consumer_commit_async.py Outdated
Comment on lines +86 to +93
try:
# Your processing goes here.
print(msg.value())
except Exception as e:
# Couldn't process it — RELEASE so it can be retried.
sys.stderr.write('%% Processing failed: %s\n' % e)
sc.acknowledge(msg, AcknowledgeType.RELEASE)
continue

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for try catch.

Comment thread examples/share_consumer_commit_async.py Outdated
sys.stderr.write('%% Consumer error: %s\n' % e)
continue

for msg in messages:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this to the first try itself.

Comment thread examples/share_consumer_avro.py Outdated
sc.acknowledge(msg, AcknowledgeType.ACCEPT)

# Flush the acks before the next poll().
sc.commit_async()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove as not needed. Poll will handle.

while True:
try:
messages = sc.poll(timeout=1.0) # a list, possibly empty
except KafkaException as e:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add other errors.

Comment on lines +62 to +66
for msg in messages:
if msg.error():
sys.stderr.write('%% Error: %s\n' % msg.error())
continue
print(msg.value())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same move above

Copilot AI review requested due to automatic review settings June 23, 2026 13:06
@airlock-confluentinc airlock-confluentinc Bot force-pushed the dev_kip-932_queues-for-kafka_usage_examples branch from 5a7277c to cf488e4 Compare June 23, 2026 13:06
@k-raina Kaushik Raina (k-raina) removed the request for review from Copilot June 23, 2026 13:06
Copilot AI review requested due to automatic review settings June 23, 2026 13:17
@k-raina Kaushik Raina (k-raina) removed the request for review from Copilot June 23, 2026 13:17
Copilot AI review requested due to automatic review settings June 23, 2026 13:38
@k-raina Kaushik Raina (k-raina) requested review from Pranav Rathi (pranavrth) and removed request for Copilot June 23, 2026 13:38
Copilot AI review requested due to automatic review settings June 24, 2026 13:20
@k-raina Kaushik Raina (k-raina) removed the request for review from Copilot June 24, 2026 13:20
@sonarqube-confluent

Copy link
Copy Markdown

Comment on lines +75 to 79
sys.stdout.write(
'%% %s [%d] at offset %d with key %s:\n'
% (msg.topic(), msg.partition(), msg.offset(), str(msg.key()))
)
print(msg.value())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some where it is sys.stdout and some where it is print(). We should be consistent. I think use print itself.

# block; results land in the callback.
sc.commit_async()
except KafkaException as e:
# Re-raise fatal errors; otherwise log and keep going.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add other exception handling here itself.

Comment on lines +110 to +134
for msg in messages:
err = msg.error()
if err is not None:
if err.code() in (KafkaError._KEY_DESERIALIZATION, KafkaError._VALUE_DESERIALIZATION):
# A record we received but can't decode. In explicit
# mode we still have to ack it — RELEASE it so that
# other consumer can pick and redeliver for processing
# again. In implicit ack mode, we currently don't release
# the record in the Preview internally.
sc.acknowledge(msg, AcknowledgeType.RELEASE)
else:
# Any other flagged record is acked internally by the
# library — see share_consumer.py for the error handling.
sys.stderr.write('%% Error: %s\n' % err)
continue

# value is already deserialized.
user = msg.value()
if user is not None:
print(
"User record {}: name: {}, favorite_number: {}, favorite_color: {}".format(
msg.key(), user.name, user.favorite_number, user.favorite_color
)
)
sc.acknowledge(msg, AcknowledgeType.ACCEPT)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this to try block.

Comment on lines 68 to 79
for msg in messages:
if msg.error():
# Per-message errors are informational; log and keep
# polling. Truly fatal errors are raised out of poll()
# itself via the error_cb path.
# The records with msg.error() field set will be acknowledged
# internally with RELEASE for temporary errors and REJECT for
# permanent errors. Check KIP for more details.
sys.stderr.write('%% Error: %s\n' % msg.error())
continue
sys.stderr.write(
sys.stdout.write(
'%% %s [%d] at offset %d with key %s:\n'
% (msg.topic(), msg.partition(), msg.offset(), str(msg.key()))
)
print(msg.value())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this to try block.

try:
while True:
try:
batch = sc.poll(timeout=1.0) # a list, possibly empty

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use messages instead of batch.

Comment on lines +69 to +97
for msg in batch:
if msg.error():
# No need to acknowledge a flagged record — the library
# already handles it. Acking it yourself is redundant and
# can override a permanent discard with a retry. Just log it.
sys.stderr.write('%% Error: %s\n' % msg.error())
continue

try:
# Your processing goes here.
print(msg.value())
except Exception as e:
# Couldn't process it — RELEASE so it can be retried.
sys.stderr.write('%% Processing failed: %s\n' % e)
sc.acknowledge(msg, AcknowledgeType.RELEASE)
continue

sc.acknowledge(msg, AcknowledgeType.ACCEPT)

if not batch:
continue

# Flush the acks before the next poll().
results = sc.commit_sync(timeout=10.0)
for topic_partition, err in results.items():
if err is not None:
sys.stderr.write(
'%% Commit failed for %s [%d]: %s\n' % (topic_partition.topic, topic_partition.partition, err)
)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to try block.

Comment on lines +76 to +77
# already handles it. Acking it yourself is redundant and
# can override a permanent discard with a retry. Just log it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Library already acknowledges the errored records internally exception for Deserialization error where it needs to be acknowledged explicitly from the application (Will be consistent in GA). You can choose to override the acknowledge for the errored offsets if required.

Comment on lines +47 to +48
# ShareConsumer configuration.
# See https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants