What would you like to happen?
What needs to happen?
In KafkaCommitOffset.CommitOffsetDoFn.processElement(), when
consumer.commitSync() fails, the exception is caught and logged
as a warning but no retry is attempted:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java#L87-L90
There is an existing TODO comment: // TODO: consider retrying.
Transient failures (e.g., TimeoutException,
RetriableCommitFailedException) during offset commits can cause
offsets to not be committed, potentially leading to duplicate
processing after a restart. Adding retry with backoff for retriable
exceptions would improve reliability.
Proposed approach
- Add retry with exponential backoff for retriable Kafka exceptions
- Keep current warn-and-skip behavior for non-retriable exceptions
- Use a bounded number of retries (e.g., 3 attempts) to avoid
blocking indefinitely
Issue Priority
Priority: 3 (nice-to-have improvement)
Issue Components
What would you like to happen?
What needs to happen?
In
KafkaCommitOffset.CommitOffsetDoFn.processElement(), whenconsumer.commitSync()fails, the exception is caught and loggedas a warning but no retry is attempted:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java#L87-L90
There is an existing TODO comment:
// TODO: consider retrying.Transient failures (e.g.,
TimeoutException,RetriableCommitFailedException) during offset commits can causeoffsets to not be committed, potentially leading to duplicate
processing after a restart. Adding retry with backoff for retriable
exceptions would improve reliability.
Proposed approach
blocking indefinitely
Issue Priority
Priority: 3 (nice-to-have improvement)
Issue Components