[KIP-932] - Add usage examples for share consumer applications#2288
[KIP-932] - Add usage examples for share consumer applications#2288Kaushik Raina (k-raina) wants to merge 8 commits into
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
| messages = sc.poll(timeout=1.0) # returns a list (possibly empty) | ||
| try: | ||
| messages = sc.poll(timeout=1.0) # a list, possibly empty | ||
| except KafkaException as e: |
There was a problem hiding this comment.
Add IllegalStateException and ConcurrentModificationException handling as well. Only in this file. We will ask user to refer this file for error handling.
| # A bad record. In implicit mode you can't ack it by hand | ||
| # (acknowledge() is rejected); the library automatically | ||
| # retries it (temporary errors) or discards it (permanent | ||
| # errors) on the next poll — it is never accepted. Just log it. |
There was a problem hiding this comment.
| # A bad record. In implicit mode you can't ack it by hand | |
| # (acknowledge() is rejected); the library automatically | |
| # retries it (temporary errors) or discards it (permanent | |
| # errors) on the next poll — it is never accepted. Just log it. | |
| # the records with msg.error() field set will be acknowledged internally with RELEASE for temporary errors and REJECT for permanent errors. Check [KIP](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255070434#KIP932:QueuesforKafka-Handlingbadrecords) for more details. |
| # errors) on the next poll — it is never accepted. Just log it. | ||
| sys.stderr.write('%% Error: %s\n' % msg.error()) | ||
| continue | ||
| sys.stderr.write( |
There was a problem hiding this comment.
| sys.stderr.write( | |
| sys.stdout.write( |
| try: | ||
| messages = sc.poll(timeout=1.0) # a list, possibly empty | ||
| except KafkaException as e: | ||
| # Re-raise fatal errors; otherwise log and keep going. |
There was a problem hiding this comment.
| # Re-raise fatal errors; otherwise log and keep going. | |
| # The consumer should stop consuming after fatal error. |
| # A record we received but can't decode. In explicit | ||
| # mode we still have to ack it — REJECT it as poison so | ||
| # it isn't redelivered. | ||
| sc.acknowledge(msg, AcknowledgeType.REJECT) |
There was a problem hiding this comment.
| # A record we received but can't decode. In explicit | |
| # mode we still have to ack it — REJECT it as poison so | |
| # it isn't redelivered. | |
| sc.acknowledge(msg, AcknowledgeType.REJECT) | |
| # A record we received but can't decode. In explicit | |
| # mode we still have to ack it — RELEASE it so that | |
| # other consumer can pick and redeliver for processing | |
| # again. In implicit ack mode, we currently don't release | |
| # the record in the Preview internally. | |
| sc.acknowledge(msg, AcknowledgeType.RELEASE) |
| try: | ||
| # Your processing goes here. | ||
| print(msg.value()) | ||
| except Exception as e: | ||
| # Couldn't process it — RELEASE so it can be retried. | ||
| sys.stderr.write('%% Processing failed: %s\n' % e) | ||
| sc.acknowledge(msg, AcknowledgeType.RELEASE) | ||
| continue |
There was a problem hiding this comment.
no need for try catch.
| sys.stderr.write('%% Consumer error: %s\n' % e) | ||
| continue | ||
|
|
||
| for msg in messages: |
There was a problem hiding this comment.
Let's move this to the first try itself.
| sc.acknowledge(msg, AcknowledgeType.ACCEPT) | ||
|
|
||
| # Flush the acks before the next poll(). | ||
| sc.commit_async() |
There was a problem hiding this comment.
remove as not needed. Poll will handle.
| while True: | ||
| try: | ||
| messages = sc.poll(timeout=1.0) # a list, possibly empty | ||
| except KafkaException as e: |
There was a problem hiding this comment.
add other errors.
| for msg in messages: | ||
| if msg.error(): | ||
| sys.stderr.write('%% Error: %s\n' % msg.error()) | ||
| continue | ||
| print(msg.value()) |
There was a problem hiding this comment.
same move above
5a7277c to
cf488e4
Compare
|
| sys.stdout.write( | ||
| '%% %s [%d] at offset %d with key %s:\n' | ||
| % (msg.topic(), msg.partition(), msg.offset(), str(msg.key())) | ||
| ) | ||
| print(msg.value()) |
There was a problem hiding this comment.
Some where it is sys.stdout and some where it is print(). We should be consistent. I think use print itself.
| # block; results land in the callback. | ||
| sc.commit_async() | ||
| except KafkaException as e: | ||
| # Re-raise fatal errors; otherwise log and keep going. |
There was a problem hiding this comment.
Add other exception handling here itself.
| for msg in messages: | ||
| err = msg.error() | ||
| if err is not None: | ||
| if err.code() in (KafkaError._KEY_DESERIALIZATION, KafkaError._VALUE_DESERIALIZATION): | ||
| # A record we received but can't decode. In explicit | ||
| # mode we still have to ack it — RELEASE it so that | ||
| # other consumer can pick and redeliver for processing | ||
| # again. In implicit ack mode, we currently don't release | ||
| # the record in the Preview internally. | ||
| sc.acknowledge(msg, AcknowledgeType.RELEASE) | ||
| else: | ||
| # Any other flagged record is acked internally by the | ||
| # library — see share_consumer.py for the error handling. | ||
| sys.stderr.write('%% Error: %s\n' % err) | ||
| continue | ||
|
|
||
| # value is already deserialized. | ||
| user = msg.value() | ||
| if user is not None: | ||
| print( | ||
| "User record {}: name: {}, favorite_number: {}, favorite_color: {}".format( | ||
| msg.key(), user.name, user.favorite_number, user.favorite_color | ||
| ) | ||
| ) | ||
| sc.acknowledge(msg, AcknowledgeType.ACCEPT) |
There was a problem hiding this comment.
Let's move this to try block.
| for msg in messages: | ||
| if msg.error(): | ||
| # Per-message errors are informational; log and keep | ||
| # polling. Truly fatal errors are raised out of poll() | ||
| # itself via the error_cb path. | ||
| # The records with msg.error() field set will be acknowledged | ||
| # internally with RELEASE for temporary errors and REJECT for | ||
| # permanent errors. Check KIP for more details. | ||
| sys.stderr.write('%% Error: %s\n' % msg.error()) | ||
| continue | ||
| sys.stderr.write( | ||
| sys.stdout.write( | ||
| '%% %s [%d] at offset %d with key %s:\n' | ||
| % (msg.topic(), msg.partition(), msg.offset(), str(msg.key())) | ||
| ) | ||
| print(msg.value()) |
There was a problem hiding this comment.
Let's move this to try block.
| try: | ||
| while True: | ||
| try: | ||
| batch = sc.poll(timeout=1.0) # a list, possibly empty |
There was a problem hiding this comment.
use messages instead of batch.
| for msg in batch: | ||
| if msg.error(): | ||
| # No need to acknowledge a flagged record — the library | ||
| # already handles it. Acking it yourself is redundant and | ||
| # can override a permanent discard with a retry. Just log it. | ||
| sys.stderr.write('%% Error: %s\n' % msg.error()) | ||
| continue | ||
|
|
||
| try: | ||
| # Your processing goes here. | ||
| print(msg.value()) | ||
| except Exception as e: | ||
| # Couldn't process it — RELEASE so it can be retried. | ||
| sys.stderr.write('%% Processing failed: %s\n' % e) | ||
| sc.acknowledge(msg, AcknowledgeType.RELEASE) | ||
| continue | ||
|
|
||
| sc.acknowledge(msg, AcknowledgeType.ACCEPT) | ||
|
|
||
| if not batch: | ||
| continue | ||
|
|
||
| # Flush the acks before the next poll(). | ||
| results = sc.commit_sync(timeout=10.0) | ||
| for topic_partition, err in results.items(): | ||
| if err is not None: | ||
| sys.stderr.write( | ||
| '%% Commit failed for %s [%d]: %s\n' % (topic_partition.topic, topic_partition.partition, err) | ||
| ) |
There was a problem hiding this comment.
Move this to try block.
| # already handles it. Acking it yourself is redundant and | ||
| # can override a permanent discard with a retry. Just log it. |
There was a problem hiding this comment.
Library already acknowledges the errored records internally exception for Deserialization error where it needs to be acknowledged explicitly from the application (Will be consistent in GA). You can choose to override the acknowledge for the errored offsets if required.
| # ShareConsumer configuration. | ||
| # See https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md |
There was a problem hiding this comment.
Let's remove this.





Summary
Adds runnable KIP-932 ShareConsumer examples to
examples/, covering bothacknowledgement modes and the common deserialization/lifecycle patterns, plus
a README section indexing them.
Examples
share_consumer.pypoll()share_consumer_context_manager.pywithstatement usage soclose()runs automatically on exitshare_consumer_commit_sync.pyshare_consumer_commit_async.pyshare_consumer_avro.pyDeserializingShareConsumerover Avro/Schema Registry, with poison-record (deserialization failure)